1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
22
23 import java.io.IOException;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.management.ManagementFactory;
26 import java.util.ArrayList;
27 import java.util.ConcurrentModificationException;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.SortedMap;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.DelayQueue;
36 import java.util.concurrent.Delayed;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.locks.ReentrantReadWriteLock;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.DroppedSnapshotException;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.classification.InterfaceAudience;
48 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
49 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
50 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.hbase.util.Counter;
53 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54 import org.apache.hadoop.hbase.util.HasThread;
55 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56 import org.apache.hadoop.hbase.util.Threads;
57 import org.apache.hadoop.ipc.RemoteException;
58 import org.apache.hadoop.util.StringUtils;
59 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
60 import org.apache.htrace.Trace;
61 import org.apache.htrace.TraceScope;
62
63 import com.google.common.base.Preconditions;
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 class MemStoreFlusher implements FlushRequester {
76 private static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
77
78 private Configuration conf;
79
80
81 private final BlockingQueue<FlushQueueEntry> flushQueue =
82 new DelayQueue<FlushQueueEntry>();
83 private final Map<Region, FlushRegionEntry> regionsInQueue =
84 new HashMap<Region, FlushRegionEntry>();
85 private AtomicBoolean wakeupPending = new AtomicBoolean();
86
87 private final long threadWakeFrequency;
88 private final HRegionServer server;
89 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
90 private final Object blockSignal = new Object();
91
92 protected long globalMemStoreLimit;
93 protected float globalMemStoreLimitLowMarkPercent;
94 protected long globalMemStoreLimitLowMark;
95
96 private long blockingWaitTime;
97 private final Counter updatesBlockedMsHighWater = new Counter();
98
99 private final FlushHandler[] flushHandlers;
100 private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
101
102
103
104
105
106 public MemStoreFlusher(final Configuration conf,
107 final HRegionServer server) {
108 super();
109 this.conf = conf;
110 this.server = server;
111 this.threadWakeFrequency =
112 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
113 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
114 float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
115 this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
116 this.globalMemStoreLimitLowMarkPercent =
117 HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
118 this.globalMemStoreLimitLowMark =
119 (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
120
121 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
122 90000);
123 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
124 this.flushHandlers = new FlushHandler[handlerCount];
125 LOG.info("globalMemStoreLimit="
126 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
127 + ", globalMemStoreLimitLowMark="
128 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
129 + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
130 }
131
132 public Counter getUpdatesBlockedMsHighWater() {
133 return this.updatesBlockedMsHighWater;
134 }
135
136
137
138
139
140
141
142 private boolean flushOneForGlobalPressure() {
143 SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
144 Set<Region> excludedRegions = new HashSet<Region>();
145
146 double secondaryMultiplier
147 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
148
149 boolean flushedOne = false;
150 while (!flushedOne) {
151
152
153 Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
154
155 Region bestAnyRegion = getBiggestMemstoreRegion(
156 regionsBySize, excludedRegions, false);
157
158 Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
159 excludedRegions);
160
161 if (bestAnyRegion == null && bestRegionReplica == null) {
162 LOG.error("Above memory mark but there are no flushable regions!");
163 return false;
164 }
165
166 Region regionToFlush;
167 if (bestFlushableRegion != null &&
168 bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {
169
170
171
172
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("Under global heap pressure: " + "Region "
175 + bestAnyRegion.getRegionInfo().getRegionNameAsString()
176 + " has too many " + "store files, but is "
177 + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1)
178 + " vs best flushable region's "
179 + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1)
180 + ". Choosing the bigger.");
181 }
182 regionToFlush = bestAnyRegion;
183 } else {
184 if (bestFlushableRegion == null) {
185 regionToFlush = bestAnyRegion;
186 } else {
187 regionToFlush = bestFlushableRegion;
188 }
189 }
190
191 Preconditions.checkState(
192 (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) ||
193 (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0));
194
195 if (regionToFlush == null ||
196 (bestRegionReplica != null &&
197 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
198 (bestRegionReplica.getMemstoreSize()
199 > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
200 LOG.info("Refreshing storefiles of region " + bestRegionReplica +
201 " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
202 server.getRegionServerAccounting().getGlobalMemstoreSize()));
203 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
204 if (!flushedOne) {
205 LOG.info("Excluding secondary region " + bestRegionReplica +
206 " - trying to find a different region to refresh files.");
207 excludedRegions.add(bestRegionReplica);
208 }
209 } else {
210 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
211 + "Total Memstore size="
212 + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
213 + ", Region memstore size="
214 + humanReadableInt(regionToFlush.getMemstoreSize()));
215 flushedOne = flushRegion(regionToFlush, true, true);
216
217 if (!flushedOne) {
218 LOG.info("Excluding unflushable region " + regionToFlush +
219 " - trying to find a different region to flush.");
220 excludedRegions.add(regionToFlush);
221 }
222 }
223 }
224 return true;
225 }
226
227 private class FlushHandler extends HasThread {
228
229 private FlushHandler(String name) {
230 super(name);
231 }
232
233 @Override
234 public void run() {
235 while (!server.isStopped()) {
236 FlushQueueEntry fqe = null;
237 try {
238 wakeupPending.set(false);
239 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
240 if (fqe == null || fqe instanceof WakeupFlushThread) {
241 if (isAboveLowWaterMark()) {
242 LOG.debug("Flush thread woke up because memory above low water="
243 + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
244 if (!flushOneForGlobalPressure()) {
245
246
247
248
249
250 Thread.sleep(1000);
251 wakeUpIfBlocking();
252 }
253
254 wakeupFlushThread();
255 }
256 continue;
257 }
258 FlushRegionEntry fre = (FlushRegionEntry) fqe;
259 if (!flushRegion(fre)) {
260 break;
261 }
262 } catch (InterruptedException ex) {
263 continue;
264 } catch (ConcurrentModificationException ex) {
265 continue;
266 } catch (Exception ex) {
267 LOG.error("Cache flusher failed for entry " + fqe, ex);
268 if (!server.checkFileSystem()) {
269 break;
270 }
271 }
272 }
273 synchronized (regionsInQueue) {
274 regionsInQueue.clear();
275 flushQueue.clear();
276 }
277
278
279 wakeUpIfBlocking();
280 LOG.info(getName() + " exiting");
281 }
282 }
283
284
285 private void wakeupFlushThread() {
286 if (wakeupPending.compareAndSet(false, true)) {
287 flushQueue.add(new WakeupFlushThread());
288 }
289 }
290
291 private Region getBiggestMemstoreRegion(
292 SortedMap<Long, Region> regionsBySize,
293 Set<Region> excludedRegions,
294 boolean checkStoreFileCount) {
295 synchronized (regionsInQueue) {
296 for (Region region : regionsBySize.values()) {
297 if (excludedRegions.contains(region)) {
298 continue;
299 }
300
301 if (((HRegion)region).writestate.flushing ||
302 !((HRegion)region).writestate.writesEnabled) {
303 continue;
304 }
305
306 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
307 continue;
308 }
309 return region;
310 }
311 }
312 return null;
313 }
314
315 private Region getBiggestMemstoreOfRegionReplica(SortedMap<Long, Region> regionsBySize,
316 Set<Region> excludedRegions) {
317 synchronized (regionsInQueue) {
318 for (Region region : regionsBySize.values()) {
319 if (excludedRegions.contains(region)) {
320 continue;
321 }
322
323 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
324 continue;
325 }
326
327 return region;
328 }
329 }
330 return null;
331 }
332
333 private boolean refreshStoreFilesAndReclaimMemory(Region region) {
334 try {
335 return region.refreshStoreFiles();
336 } catch (IOException e) {
337 LOG.warn("Refreshing store files failed with exception", e);
338 }
339 return false;
340 }
341
342
343
344
345 private boolean isAboveHighWaterMark() {
346 return server.getRegionServerAccounting().
347 getGlobalMemstoreSize() >= globalMemStoreLimit;
348 }
349
350
351
352
353 private boolean isAboveLowWaterMark() {
354 return server.getRegionServerAccounting().
355 getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
356 }
357
358 @Override
359 public void requestFlush(Region r, boolean forceFlushAllStores) {
360 synchronized (regionsInQueue) {
361 if (!regionsInQueue.containsKey(r)) {
362
363
364 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
365 this.regionsInQueue.put(r, fqe);
366 this.flushQueue.add(fqe);
367 }
368 }
369 }
370
371 @Override
372 public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
373 synchronized (regionsInQueue) {
374 if (!regionsInQueue.containsKey(r)) {
375
376 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
377 fqe.requeue(delay);
378 this.regionsInQueue.put(r, fqe);
379 this.flushQueue.add(fqe);
380 }
381 }
382 }
383
384 public int getFlushQueueSize() {
385 return flushQueue.size();
386 }
387
388
389
390
391 void interruptIfNecessary() {
392 lock.writeLock().lock();
393 try {
394 for (FlushHandler flushHander : flushHandlers) {
395 if (flushHander != null) flushHander.interrupt();
396 }
397 } finally {
398 lock.writeLock().unlock();
399 }
400 }
401
402 synchronized void start(UncaughtExceptionHandler eh) {
403 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
404 server.getServerName().toShortString() + "-MemStoreFlusher", eh);
405 for (int i = 0; i < flushHandlers.length; i++) {
406 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
407 flusherThreadFactory.newThread(flushHandlers[i]);
408 flushHandlers[i].start();
409 }
410 }
411
412 boolean isAlive() {
413 for (FlushHandler flushHander : flushHandlers) {
414 if (flushHander != null && flushHander.isAlive()) {
415 return true;
416 }
417 }
418 return false;
419 }
420
421 void join() {
422 for (FlushHandler flushHander : flushHandlers) {
423 if (flushHander != null) {
424 Threads.shutdown(flushHander.getThread());
425 }
426 }
427 }
428
429
430
431
432
433
434
435
436
437 private boolean flushRegion(final FlushRegionEntry fqe) {
438 Region region = fqe.region;
439 if (!region.getRegionInfo().isMetaRegion() &&
440 isTooManyStoreFiles(region)) {
441 if (fqe.isMaximumWait(this.blockingWaitTime)) {
442 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
443 "ms on a compaction to clean up 'too many store files'; waited " +
444 "long enough... proceeding with flush of " +
445 region.getRegionInfo().getRegionNameAsString());
446 } else {
447
448 if (fqe.getRequeueCount() <= 0) {
449
450 LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " +
451 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
452 if (!this.server.compactSplitThread.requestSplit(region)) {
453 try {
454 this.server.compactSplitThread.requestSystemCompaction(
455 region, Thread.currentThread().getName());
456 } catch (IOException e) {
457 e = e instanceof RemoteException ?
458 ((RemoteException)e).unwrapRemoteException() : e;
459 LOG.error("Cache flush failed for region " +
460 Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
461 }
462 }
463 }
464
465
466
467 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
468
469 return true;
470 }
471 }
472 return flushRegion(region, false, fqe.isForceFlushAllStores());
473 }
474
475
476
477
478
479
480
481
482
483
484
485
486
487 private boolean flushRegion(final Region region, final boolean emergencyFlush,
488 boolean forceFlushAllStores) {
489 long startTime = 0;
490 synchronized (this.regionsInQueue) {
491 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
492
493 if (fqe != null) {
494 startTime = fqe.createTime;
495 }
496 if (fqe != null && emergencyFlush) {
497
498
499 flushQueue.remove(fqe);
500 }
501 }
502 if (startTime == 0) {
503
504
505
506 startTime = EnvironmentEdgeManager.currentTime();
507 }
508 lock.readLock().lock();
509 try {
510 notifyFlushRequest(region, emergencyFlush);
511 FlushResult flushResult = region.flush(forceFlushAllStores);
512 boolean shouldCompact = flushResult.isCompactionNeeded();
513
514 boolean shouldSplit = ((HRegion)region).checkSplit() != null;
515 if (shouldSplit) {
516 this.server.compactSplitThread.requestSplit(region);
517 } else if (shouldCompact) {
518 server.compactSplitThread.requestSystemCompaction(
519 region, Thread.currentThread().getName());
520 }
521 if (flushResult.isFlushSucceeded()) {
522 long endTime = EnvironmentEdgeManager.currentTime();
523 server.metricsRegionServer.updateFlushTime(endTime - startTime);
524 }
525 } catch (DroppedSnapshotException ex) {
526
527
528
529
530
531 server.abort("Replay of WAL required. Forcing server shutdown", ex);
532 return false;
533 } catch (IOException ex) {
534 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
535 LOG.error(
536 "Cache flush failed"
537 + (region != null ? (" for region " +
538 Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
539 : ""), ex);
540 if (!server.checkFileSystem()) {
541 return false;
542 }
543 } finally {
544 lock.readLock().unlock();
545 wakeUpIfBlocking();
546 }
547 return true;
548 }
549
550 private void notifyFlushRequest(Region region, boolean emergencyFlush) {
551 FlushType type = FlushType.NORMAL;
552 if (emergencyFlush) {
553 type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
554 }
555 for (FlushRequestListener listener : flushRequestListeners) {
556 listener.flushRequested(type, region);
557 }
558 }
559
560 private void wakeUpIfBlocking() {
561 synchronized (blockSignal) {
562 blockSignal.notifyAll();
563 }
564 }
565
566 private boolean isTooManyStoreFiles(Region region) {
567 for (Store store : region.getStores()) {
568 if (store.hasTooManyStoreFiles()) {
569 return true;
570 }
571 }
572 return false;
573 }
574
575
576
577
578
579
580
581 public void reclaimMemStoreMemory() {
582 TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
583 if (isAboveHighWaterMark()) {
584 if (Trace.isTracing()) {
585 scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
586 }
587 long start = EnvironmentEdgeManager.currentTime();
588 synchronized (this.blockSignal) {
589 boolean blocked = false;
590 long startTime = 0;
591 boolean interrupted = false;
592 try {
593 while (isAboveHighWaterMark() && !server.isStopped()) {
594 if (!blocked) {
595 startTime = EnvironmentEdgeManager.currentTime();
596 LOG.info("Blocking updates on "
597 + server.toString()
598 + ": the global memstore size "
599 + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
600 .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
601 + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
602 }
603 blocked = true;
604 wakeupFlushThread();
605 try {
606
607
608 blockSignal.wait(5 * 1000);
609 } catch (InterruptedException ie) {
610 LOG.warn("Interrupted while waiting");
611 interrupted = true;
612 }
613 long took = EnvironmentEdgeManager.currentTime() - start;
614 LOG.warn("Memstore is above high water mark and block " + took + "ms");
615 }
616 } finally {
617 if (interrupted) {
618 Thread.currentThread().interrupt();
619 }
620 }
621
622 if(blocked){
623 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
624 if(totalTime > 0){
625 this.updatesBlockedMsHighWater.add(totalTime);
626 }
627 LOG.info("Unblocking updates for server " + server.toString());
628 }
629 }
630 } else if (isAboveLowWaterMark()) {
631 wakeupFlushThread();
632 }
633 scope.close();
634 }
635 @Override
636 public String toString() {
637 return "flush_queue="
638 + flushQueue.size();
639 }
640
641 public String dumpQueue() {
642 StringBuilder queueList = new StringBuilder();
643 queueList.append("Flush Queue Queue dump:\n");
644 queueList.append(" Flush Queue:\n");
645 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
646
647 while(it.hasNext()){
648 queueList.append(" "+it.next().toString());
649 queueList.append("\n");
650 }
651
652 return queueList.toString();
653 }
654
655
656
657
658
659 @Override
660 public void registerFlushRequestListener(final FlushRequestListener listener) {
661 this.flushRequestListeners.add(listener);
662 }
663
664
665
666
667
668
669 @Override
670 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
671 return this.flushRequestListeners.remove(listener);
672 }
673
674
675
676
677
678 @Override
679 public void setGlobalMemstoreLimit(long globalMemStoreSize) {
680 this.globalMemStoreLimit = globalMemStoreSize;
681 this.globalMemStoreLimitLowMark =
682 (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
683 reclaimMemStoreMemory();
684 }
685
686 public long getMemoryLimit() {
687 return this.globalMemStoreLimit;
688 }
689
690 interface FlushQueueEntry extends Delayed {
691 }
692
693
694
695
696 static class WakeupFlushThread implements FlushQueueEntry {
697 @Override
698 public long getDelay(TimeUnit unit) {
699 return 0;
700 }
701
702 @Override
703 public int compareTo(Delayed o) {
704 return -1;
705 }
706
707 @Override
708 public boolean equals(Object obj) {
709 return (this == obj);
710 }
711 }
712
713
714
715
716
717
718
719
720
721 static class FlushRegionEntry implements FlushQueueEntry {
722 private final Region region;
723
724 private final long createTime;
725 private long whenToExpire;
726 private int requeueCount = 0;
727
728 private boolean forceFlushAllStores;
729
730 FlushRegionEntry(final Region r, boolean forceFlushAllStores) {
731 this.region = r;
732 this.createTime = EnvironmentEdgeManager.currentTime();
733 this.whenToExpire = this.createTime;
734 this.forceFlushAllStores = forceFlushAllStores;
735 }
736
737
738
739
740
741 public boolean isMaximumWait(final long maximumWait) {
742 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
743 }
744
745
746
747
748
749 public int getRequeueCount() {
750 return this.requeueCount;
751 }
752
753
754
755
756 public boolean isForceFlushAllStores() {
757 return forceFlushAllStores;
758 }
759
760
761
762
763
764
765
766 public FlushRegionEntry requeue(final long when) {
767 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
768 this.requeueCount++;
769 return this;
770 }
771
772 @Override
773 public long getDelay(TimeUnit unit) {
774 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
775 TimeUnit.MILLISECONDS);
776 }
777
778 @Override
779 public int compareTo(Delayed other) {
780
781 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
782 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
783 if (ret != 0) {
784 return ret;
785 }
786 FlushQueueEntry otherEntry = (FlushQueueEntry) other;
787 return hashCode() - otherEntry.hashCode();
788 }
789
790 @Override
791 public String toString() {
792 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
793 }
794
795 @Override
796 public int hashCode() {
797 int hash = (int) getDelay(TimeUnit.MILLISECONDS);
798 return hash ^ region.hashCode();
799 }
800
801 @Override
802 public boolean equals(Object obj) {
803 if (this == obj) {
804 return true;
805 }
806 if (obj == null || getClass() != obj.getClass()) {
807 return false;
808 }
809 Delayed other = (Delayed) obj;
810 return compareTo(other) == 0;
811 }
812 }
813 }
814
815 enum FlushType {
816 NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
817 }