View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.wal;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.io.InterruptedIOException;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import com.google.common.annotations.VisibleForTesting;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.wal.WAL.Reader;
39  import org.apache.hadoop.hbase.wal.WALProvider.Writer;
40  import org.apache.hadoop.hbase.util.CancelableProgressable;
41  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42  
43  // imports for things that haven't moved from regionserver.wal yet.
44  import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
45  import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
46  import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
47  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
48  
49  /**
50   * Entry point for users of the Write Ahead Log.
51   * Acts as the shim between internal use and the particular WALProvider we use to handle wal
52   * requests.
53   *
54   * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
55   * implementations:
56   * <ul>
57   *   <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
58   *                                  "filesystem"</li>
59   *   <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
60   *                             FileSystem interface, normally HDFS.</li>
61   *   <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
62   *                           server.</li>
63   * </ul>
64   *
65   * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
66   */
67  @InterfaceAudience.Private
68  public class WALFactory {
69  
70    private static final Log LOG = LogFactory.getLog(WALFactory.class);
71  
72    /**
73     * Maps between configuration names for providers and implementation classes.
74     */
75    static enum Providers {
76      defaultProvider(DefaultWALProvider.class),
77      filesystem(DefaultWALProvider.class),
78      multiwal(RegionGroupingProvider.class);
79  
80      Class<? extends WALProvider> clazz;
81      Providers(Class<? extends WALProvider> clazz) {
82        this.clazz = clazz;
83      }
84    }
85  
86    public static final String WAL_PROVIDER = "hbase.wal.provider";
87    static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
88  
89    static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
90    static final String DEFAULT_META_WAL_PROVIDER = Providers.defaultProvider.name();
91  
92    final String factoryId;
93    final WALProvider provider;
94    // The meta updates are written to a different wal. If this
95    // regionserver holds meta regions, then this ref will be non-null.
96    // lazily intialized; most RegionServers don't deal with META
97    final AtomicReference<WALProvider> metaProvider = new AtomicReference<WALProvider>();
98  
99    /**
100    * Configuration-specified WAL Reader used when a custom reader is requested
101    */
102   private final Class<? extends DefaultWALProvider.Reader> logReaderClass;
103 
104   /**
105    * How long to attempt opening in-recovery wals
106    */
107   private final int timeoutMillis;
108 
109   private final Configuration conf;
110 
111   // Used for the singleton WALFactory, see below.
112   private WALFactory(Configuration conf) {
113     // this code is duplicated here so we can keep our members final.
114     // until we've moved reader/writer construction down into providers, this initialization must
115     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
116     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
117     /* TODO Both of these are probably specific to the fs wal provider */
118     logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
119         DefaultWALProvider.Reader.class);
120     this.conf = conf;
121     // end required early initialization
122 
123     // this instance can't create wals, just reader/writers.
124     provider = null;
125     factoryId = SINGLETON_ID;
126   }
127 
128   /**
129    * instantiate a provider from a config property.
130    * requires conf to have already been set (as well as anything the provider might need to read).
131    */
132   WALProvider getProvider(final String key, final String defaultValue,
133       final List<WALActionsListener> listeners, final String providerId) throws IOException {
134     Class<? extends WALProvider> clazz;
135     try {
136       clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
137     } catch (IllegalArgumentException exception) {
138       // Fall back to them specifying a class name
139       // Note that the passed default class shouldn't actually be used, since the above only fails
140       // when there is a config value present.
141       clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
142     }
143     LOG.info("Instantiating WALProvider of type " + clazz);
144     try {
145       final WALProvider result = clazz.newInstance();
146       result.init(this, conf, listeners, providerId);
147       return result;
148     } catch (InstantiationException exception) {
149       LOG.error("couldn't set up WALProvider, check config key " + key);
150       LOG.debug("Exception details for failure to load WALProvider.", exception);
151       throw new IOException("couldn't set up WALProvider", exception);
152     } catch (IllegalAccessException exception) {
153       LOG.error("couldn't set up WALProvider, check config key " + key);
154       LOG.debug("Exception details for failure to load WALProvider.", exception);
155       throw new IOException("couldn't set up WALProvider", exception);
156     }
157   }
158 
159   /**
160    * @param conf must not be null, will keep a reference to read params in later reader/writer
161    *     instances.
162    * @param listeners may be null. will be given to all created wals (and not meta-wals)
163    * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
164    *     to make a directory
165    */
166   public WALFactory(final Configuration conf, final List<WALActionsListener> listeners,
167       final String factoryId) throws IOException {
168     // until we've moved reader/writer construction down into providers, this initialization must
169     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
170     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
171     /* TODO Both of these are probably specific to the fs wal provider */
172     logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
173         DefaultWALProvider.Reader.class);
174     this.conf = conf;
175     this.factoryId = factoryId;
176     // end required early initialization
177     if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
178       provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null);
179     } else {
180       // special handling of existing configuration behavior.
181       LOG.warn("Running with WAL disabled.");
182       provider = new DisabledWALProvider();
183       provider.init(this, conf, null, factoryId);
184     }
185   }
186 
187   /**
188    * Shutdown all WALs and clean up any underlying storage.
189    * Use only when you will not need to replay and edits that have gone to any wals from this
190    * factory.
191    */
192   public void close() throws IOException {
193     final WALProvider metaProvider = this.metaProvider.get();
194     if (null != metaProvider) {
195       metaProvider.close();
196     }
197     // close is called on a WALFactory with null provider in the case of contention handling
198     // within the getInstance method.
199     if (null != provider) {
200       provider.close();
201     }
202   }
203 
204   /**
205    * Tell the underlying WAL providers to shut down, but do not clean up underlying storage.
206    * If you are not ending cleanly and will need to replay edits from this factory's wals,
207    * use this method if you can as it will try to leave things as tidy as possible.
208    */
209   public void shutdown() throws IOException {
210     IOException exception = null;
211     final WALProvider metaProvider = this.metaProvider.get();
212     if (null != metaProvider) {
213       try {
214         metaProvider.shutdown();
215       } catch(IOException ioe) {
216         exception = ioe;
217       }
218     }
219     provider.shutdown();
220     if (null != exception) {
221       throw exception;
222     }
223   }
224 
225   /**
226    * @param identifier may not be null, contents will not be altered
227    * @param namespace could be null, and will use default namespace if null
228    */
229   public WAL getWAL(final byte[] identifier, final byte[] namespace) throws IOException {
230     return provider.getWAL(identifier, namespace);
231   }
232 
233   /**
234    * @param identifier may not be null, contents will not be altered
235    */
236   public WAL getMetaWAL(final byte[] identifier) throws IOException {
237     WALProvider metaProvider = this.metaProvider.get();
238     if (null == metaProvider) {
239       final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
240           Collections.<WALActionsListener>singletonList(new MetricsWAL()),
241           DefaultWALProvider.META_WAL_PROVIDER_ID);
242       if (this.metaProvider.compareAndSet(null, temp)) {
243         metaProvider = temp;
244       } else {
245         // reference must now be to a provider created in another thread.
246         temp.close();
247         metaProvider = this.metaProvider.get();
248       }
249     }
250     return metaProvider.getWAL(identifier, null);
251   }
252 
253   public Reader createReader(final FileSystem fs, final Path path) throws IOException {
254     return createReader(fs, path, (CancelableProgressable)null);
255   }
256 
257   /**
258    * Create a reader for the WAL. If you are reading from a file that's being written to and need
259    * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
260    * then just seek back to the last known good position.
261    * @return A WAL reader.  Close when done with it.
262    * @throws IOException
263    */
264   public Reader createReader(final FileSystem fs, final Path path,
265       CancelableProgressable reporter) throws IOException {
266     return createReader(fs, path, reporter, true);
267   }
268 
269   public Reader createReader(final FileSystem fs, final Path path,
270       CancelableProgressable reporter, boolean allowCustom)
271       throws IOException {
272     Class<? extends DefaultWALProvider.Reader> lrClass =
273         allowCustom ? logReaderClass : ProtobufLogReader.class;
274 
275     try {
276       // A wal file could be under recovery, so it may take several
277       // tries to get it open. Instead of claiming it is corrupted, retry
278       // to open it up to 5 minutes by default.
279       long startWaiting = EnvironmentEdgeManager.currentTime();
280       long openTimeout = timeoutMillis + startWaiting;
281       int nbAttempt = 0;
282       FSDataInputStream stream = null;
283       while (true) {
284         try {
285           if (lrClass != ProtobufLogReader.class) {
286             // User is overriding the WAL reader, let them.
287             DefaultWALProvider.Reader reader = lrClass.newInstance();
288             reader.init(fs, path, conf, null);
289             return reader;
290           } else {
291             stream = fs.open(path);
292             // Note that zero-length file will fail to read PB magic, and attempt to create
293             // a non-PB reader and fail the same way existing code expects it to. If we get
294             // rid of the old reader entirely, we need to handle 0-size files differently from
295             // merely non-PB files.
296             byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
297             boolean isPbWal = (stream.read(magic) == magic.length)
298                 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
299             DefaultWALProvider.Reader reader =
300                 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
301             reader.init(fs, path, conf, stream);
302             return reader;
303           }
304         } catch (IOException e) {
305           try {
306             if (stream != null) {
307               stream.close();
308             }
309           } catch (IOException exception) {
310             LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
311             LOG.debug("exception details", exception);
312           }
313           String msg = e.getMessage();
314           if (msg != null && (msg.contains("Cannot obtain block length")
315               || msg.contains("Could not obtain the last block")
316               || msg.matches("Blocklist for [^ ]* has changed.*"))) {
317             if (++nbAttempt == 1) {
318               LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
319             }
320             if (reporter != null && !reporter.progress()) {
321               throw new InterruptedIOException("Operation is cancelled");
322             }
323             if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
324               LOG.error("Can't open after " + nbAttempt + " attempts and "
325                 + (EnvironmentEdgeManager.currentTime() - startWaiting)
326                 + "ms " + " for " + path);
327             } else {
328               try {
329                 Thread.sleep(nbAttempt < 3 ? 500 : 1000);
330                 continue; // retry
331               } catch (InterruptedException ie) {
332                 InterruptedIOException iioe = new InterruptedIOException();
333                 iioe.initCause(ie);
334                 throw iioe;
335               }
336             }
337           }
338           throw e;
339         }
340       }
341     } catch (IOException ie) {
342       throw ie;
343     } catch (Exception e) {
344       throw new IOException("Cannot get log reader", e);
345     }
346   }
347 
348   /**
349    * Create a writer for the WAL.
350    * should be package-private. public only for tests and
351    * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
352    * @return A WAL writer.  Close when done with it.
353    * @throws IOException
354    */
355   public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
356     return DefaultWALProvider.createWriter(conf, fs, path, false);
357   }
358 
359   /**
360    * should be package-private, visible for recovery testing.
361    * @return an overwritable writer for recovered edits. caller should close.
362    */
363   @VisibleForTesting
364   public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
365       throws IOException {
366     return DefaultWALProvider.createWriter(conf, fs, path, true);
367   }
368 
369   // These static methods are currently used where it's impractical to
370   // untangle the reliance on state in the filesystem. They rely on singleton
371   // WALFactory that just provides Reader / Writers.
372   // For now, first Configuration object wins. Practically this just impacts the reader/writer class
373   private static final AtomicReference<WALFactory> singleton = new AtomicReference<WALFactory>();
374   private static final String SINGLETON_ID = WALFactory.class.getName();
375   
376   // public only for FSHLog
377   public static WALFactory getInstance(Configuration configuration) {
378     WALFactory factory = singleton.get();
379     if (null == factory) {
380       WALFactory temp = new WALFactory(configuration);
381       if (singleton.compareAndSet(null, temp)) {
382         factory = temp;
383       } else {
384         // someone else beat us to initializing
385         try {
386           temp.close();
387         } catch (IOException exception) {
388           LOG.debug("failed to close temporary singleton. ignoring.", exception);
389         }
390         factory = singleton.get();
391       }
392     }
393     return factory;
394   }
395 
396   /**
397    * Create a reader for the given path, accept custom reader classes from conf.
398    * If you already have a WALFactory, you should favor the instance method.
399    * @return a WAL Reader, caller must close.
400    */
401   public static Reader createReader(final FileSystem fs, final Path path,
402       final Configuration configuration) throws IOException {
403     return getInstance(configuration).createReader(fs, path);
404   }
405 
406   /**
407    * Create a reader for the given path, accept custom reader classes from conf.
408    * If you already have a WALFactory, you should favor the instance method.
409    * @return a WAL Reader, caller must close.
410    */
411   static Reader createReader(final FileSystem fs, final Path path,
412       final Configuration configuration, final CancelableProgressable reporter) throws IOException {
413     return getInstance(configuration).createReader(fs, path, reporter);
414   }
415 
416   /**
417    * Create a reader for the given path, ignore custom reader classes from conf.
418    * If you already have a WALFactory, you should favor the instance method.
419    * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
420    * @return a WAL Reader, caller must close.
421    */
422   public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
423       final Configuration configuration) throws IOException {
424     return getInstance(configuration).createReader(fs, path, null, false);
425   }
426 
427   /**
428    * If you already have a WALFactory, you should favor the instance method.
429    * @return a Writer that will overwrite files. Caller must close.
430    */
431   static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
432       final Configuration configuration)
433       throws IOException {
434     return DefaultWALProvider.createWriter(configuration, fs, path, true);
435   }
436 
437   /**
438    * If you already have a WALFactory, you should favor the instance method.
439    * @return a writer that won't overwrite files. Caller must close.
440    */
441   @VisibleForTesting
442   public static Writer createWALWriter(final FileSystem fs, final Path path,
443       final Configuration configuration)
444       throws IOException {
445     return DefaultWALProvider.createWriter(configuration, fs, path, false);
446   }
447 
448   public final WALProvider getWALProvider() {
449     return this.provider;
450   }
451 
452   public final WALProvider getMetaWALProvider() {
453     return this.metaProvider.get();
454   }
455 }