1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.wal;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.io.InterruptedIOException;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import com.google.common.annotations.VisibleForTesting;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FSDataInputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.wal.WAL.Reader;
39 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
40 import org.apache.hadoop.hbase.util.CancelableProgressable;
41 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42
43
44 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
45 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
46 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
47 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 @InterfaceAudience.Private
68 public class WALFactory {
69
70 private static final Log LOG = LogFactory.getLog(WALFactory.class);
71
72
73
74
75 static enum Providers {
76 defaultProvider(DefaultWALProvider.class),
77 filesystem(DefaultWALProvider.class),
78 multiwal(RegionGroupingProvider.class);
79
80 Class<? extends WALProvider> clazz;
81 Providers(Class<? extends WALProvider> clazz) {
82 this.clazz = clazz;
83 }
84 }
85
86 public static final String WAL_PROVIDER = "hbase.wal.provider";
87 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
88
89 static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
90 static final String DEFAULT_META_WAL_PROVIDER = Providers.defaultProvider.name();
91
92 final String factoryId;
93 final WALProvider provider;
94
95
96
97 final AtomicReference<WALProvider> metaProvider = new AtomicReference<WALProvider>();
98
99
100
101
102 private final Class<? extends DefaultWALProvider.Reader> logReaderClass;
103
104
105
106
107 private final int timeoutMillis;
108
109 private final Configuration conf;
110
111
112 private WALFactory(Configuration conf) {
113
114
115
116 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
117
118 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
119 DefaultWALProvider.Reader.class);
120 this.conf = conf;
121
122
123
124 provider = null;
125 factoryId = SINGLETON_ID;
126 }
127
128
129
130
131
132 WALProvider getProvider(final String key, final String defaultValue,
133 final List<WALActionsListener> listeners, final String providerId) throws IOException {
134 Class<? extends WALProvider> clazz;
135 try {
136 clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
137 } catch (IllegalArgumentException exception) {
138
139
140
141 clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
142 }
143 LOG.info("Instantiating WALProvider of type " + clazz);
144 try {
145 final WALProvider result = clazz.newInstance();
146 result.init(this, conf, listeners, providerId);
147 return result;
148 } catch (InstantiationException exception) {
149 LOG.error("couldn't set up WALProvider, check config key " + key);
150 LOG.debug("Exception details for failure to load WALProvider.", exception);
151 throw new IOException("couldn't set up WALProvider", exception);
152 } catch (IllegalAccessException exception) {
153 LOG.error("couldn't set up WALProvider, check config key " + key);
154 LOG.debug("Exception details for failure to load WALProvider.", exception);
155 throw new IOException("couldn't set up WALProvider", exception);
156 }
157 }
158
159
160
161
162
163
164
165
166 public WALFactory(final Configuration conf, final List<WALActionsListener> listeners,
167 final String factoryId) throws IOException {
168
169
170 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
171
172 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
173 DefaultWALProvider.Reader.class);
174 this.conf = conf;
175 this.factoryId = factoryId;
176
177 if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
178 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null);
179 } else {
180
181 LOG.warn("Running with WAL disabled.");
182 provider = new DisabledWALProvider();
183 provider.init(this, conf, null, factoryId);
184 }
185 }
186
187
188
189
190
191
192 public void close() throws IOException {
193 final WALProvider metaProvider = this.metaProvider.get();
194 if (null != metaProvider) {
195 metaProvider.close();
196 }
197
198
199 if (null != provider) {
200 provider.close();
201 }
202 }
203
204
205
206
207
208
209 public void shutdown() throws IOException {
210 IOException exception = null;
211 final WALProvider metaProvider = this.metaProvider.get();
212 if (null != metaProvider) {
213 try {
214 metaProvider.shutdown();
215 } catch(IOException ioe) {
216 exception = ioe;
217 }
218 }
219 provider.shutdown();
220 if (null != exception) {
221 throw exception;
222 }
223 }
224
225
226
227
228
229 public WAL getWAL(final byte[] identifier, final byte[] namespace) throws IOException {
230 return provider.getWAL(identifier, namespace);
231 }
232
233
234
235
236 public WAL getMetaWAL(final byte[] identifier) throws IOException {
237 WALProvider metaProvider = this.metaProvider.get();
238 if (null == metaProvider) {
239 final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
240 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
241 DefaultWALProvider.META_WAL_PROVIDER_ID);
242 if (this.metaProvider.compareAndSet(null, temp)) {
243 metaProvider = temp;
244 } else {
245
246 temp.close();
247 metaProvider = this.metaProvider.get();
248 }
249 }
250 return metaProvider.getWAL(identifier, null);
251 }
252
253 public Reader createReader(final FileSystem fs, final Path path) throws IOException {
254 return createReader(fs, path, (CancelableProgressable)null);
255 }
256
257
258
259
260
261
262
263
264 public Reader createReader(final FileSystem fs, final Path path,
265 CancelableProgressable reporter) throws IOException {
266 return createReader(fs, path, reporter, true);
267 }
268
269 public Reader createReader(final FileSystem fs, final Path path,
270 CancelableProgressable reporter, boolean allowCustom)
271 throws IOException {
272 Class<? extends DefaultWALProvider.Reader> lrClass =
273 allowCustom ? logReaderClass : ProtobufLogReader.class;
274
275 try {
276
277
278
279 long startWaiting = EnvironmentEdgeManager.currentTime();
280 long openTimeout = timeoutMillis + startWaiting;
281 int nbAttempt = 0;
282 FSDataInputStream stream = null;
283 while (true) {
284 try {
285 if (lrClass != ProtobufLogReader.class) {
286
287 DefaultWALProvider.Reader reader = lrClass.newInstance();
288 reader.init(fs, path, conf, null);
289 return reader;
290 } else {
291 stream = fs.open(path);
292
293
294
295
296 byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
297 boolean isPbWal = (stream.read(magic) == magic.length)
298 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
299 DefaultWALProvider.Reader reader =
300 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
301 reader.init(fs, path, conf, stream);
302 return reader;
303 }
304 } catch (IOException e) {
305 try {
306 if (stream != null) {
307 stream.close();
308 }
309 } catch (IOException exception) {
310 LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
311 LOG.debug("exception details", exception);
312 }
313 String msg = e.getMessage();
314 if (msg != null && (msg.contains("Cannot obtain block length")
315 || msg.contains("Could not obtain the last block")
316 || msg.matches("Blocklist for [^ ]* has changed.*"))) {
317 if (++nbAttempt == 1) {
318 LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
319 }
320 if (reporter != null && !reporter.progress()) {
321 throw new InterruptedIOException("Operation is cancelled");
322 }
323 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
324 LOG.error("Can't open after " + nbAttempt + " attempts and "
325 + (EnvironmentEdgeManager.currentTime() - startWaiting)
326 + "ms " + " for " + path);
327 } else {
328 try {
329 Thread.sleep(nbAttempt < 3 ? 500 : 1000);
330 continue;
331 } catch (InterruptedException ie) {
332 InterruptedIOException iioe = new InterruptedIOException();
333 iioe.initCause(ie);
334 throw iioe;
335 }
336 }
337 }
338 throw e;
339 }
340 }
341 } catch (IOException ie) {
342 throw ie;
343 } catch (Exception e) {
344 throw new IOException("Cannot get log reader", e);
345 }
346 }
347
348
349
350
351
352
353
354
355 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
356 return DefaultWALProvider.createWriter(conf, fs, path, false);
357 }
358
359
360
361
362
363 @VisibleForTesting
364 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
365 throws IOException {
366 return DefaultWALProvider.createWriter(conf, fs, path, true);
367 }
368
369
370
371
372
373 private static final AtomicReference<WALFactory> singleton = new AtomicReference<WALFactory>();
374 private static final String SINGLETON_ID = WALFactory.class.getName();
375
376
377 public static WALFactory getInstance(Configuration configuration) {
378 WALFactory factory = singleton.get();
379 if (null == factory) {
380 WALFactory temp = new WALFactory(configuration);
381 if (singleton.compareAndSet(null, temp)) {
382 factory = temp;
383 } else {
384
385 try {
386 temp.close();
387 } catch (IOException exception) {
388 LOG.debug("failed to close temporary singleton. ignoring.", exception);
389 }
390 factory = singleton.get();
391 }
392 }
393 return factory;
394 }
395
396
397
398
399
400
401 public static Reader createReader(final FileSystem fs, final Path path,
402 final Configuration configuration) throws IOException {
403 return getInstance(configuration).createReader(fs, path);
404 }
405
406
407
408
409
410
411 static Reader createReader(final FileSystem fs, final Path path,
412 final Configuration configuration, final CancelableProgressable reporter) throws IOException {
413 return getInstance(configuration).createReader(fs, path, reporter);
414 }
415
416
417
418
419
420
421
422 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
423 final Configuration configuration) throws IOException {
424 return getInstance(configuration).createReader(fs, path, null, false);
425 }
426
427
428
429
430
431 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
432 final Configuration configuration)
433 throws IOException {
434 return DefaultWALProvider.createWriter(configuration, fs, path, true);
435 }
436
437
438
439
440
441 @VisibleForTesting
442 public static Writer createWALWriter(final FileSystem fs, final Path path,
443 final Configuration configuration)
444 throws IOException {
445 return DefaultWALProvider.createWriter(configuration, fs, path, false);
446 }
447
448 public final WALProvider getWALProvider() {
449 return this.provider;
450 }
451
452 public final WALProvider getMetaWALProvider() {
453 return this.metaProvider.get();
454 }
455 }