1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Map.Entry;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.locks.ReentrantLock;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.Server;
32 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
33 import org.apache.hadoop.hbase.wal.WAL;
34 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.HasThread;
37 import org.apache.hadoop.ipc.RemoteException;
38
39 import com.google.common.annotations.VisibleForTesting;
40
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 @VisibleForTesting
52 public class LogRoller extends HasThread {
53 private static final Log LOG = LogFactory.getLog(LogRoller.class);
54 private final ReentrantLock rollLock = new ReentrantLock();
55 private final AtomicBoolean rollLog = new AtomicBoolean(false);
56 private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
57 new ConcurrentHashMap<WAL, Boolean>();
58 private final Server server;
59 protected final RegionServerServices services;
60 private volatile long lastrolltime = System.currentTimeMillis();
61
62 private final long rollperiod;
63 private final int threadWakeFrequency;
64
65 public void addWAL(final WAL wal) {
66 if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
67 wal.registerWALActionsListener(new WALActionsListener.Base() {
68 @Override
69 public void logRollRequested(boolean lowReplicas) {
70 walNeedsRoll.put(wal, Boolean.TRUE);
71
72 synchronized(rollLog) {
73 rollLog.set(true);
74 rollLog.notifyAll();
75 }
76 }
77 });
78 }
79 }
80
81 public void requestRollAll() {
82 for (WAL wal : walNeedsRoll.keySet()) {
83 walNeedsRoll.put(wal, Boolean.TRUE);
84 }
85 synchronized(rollLog) {
86 rollLog.set(true);
87 rollLog.notifyAll();
88 }
89 }
90
91
92 public LogRoller(final Server server, final RegionServerServices services) {
93 super();
94 this.server = server;
95 this.services = services;
96 this.rollperiod = this.server.getConfiguration().
97 getLong("hbase.regionserver.logroll.period", 3600000);
98 this.threadWakeFrequency = this.server.getConfiguration().
99 getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
100 }
101
102 @Override
103 public void interrupt() {
104
105 synchronized (rollLog) {
106 this.rollLog.notify();
107 }
108 super.interrupt();
109 }
110
111 @Override
112 public void run() {
113 while (!server.isStopped()) {
114 long now = System.currentTimeMillis();
115 boolean periodic = false;
116 if (!rollLog.get()) {
117 periodic = (now - this.lastrolltime) > this.rollperiod;
118 if (!periodic) {
119 synchronized (rollLog) {
120 try {
121 if (!rollLog.get()) {
122 rollLog.wait(this.threadWakeFrequency);
123 }
124 } catch (InterruptedException e) {
125
126 }
127 }
128 continue;
129 }
130
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
133 }
134 } else if (LOG.isDebugEnabled()) {
135 LOG.debug("WAL roll requested");
136 }
137 rollLock.lock();
138 try {
139 this.lastrolltime = now;
140 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
141 final WAL wal = entry.getKey();
142
143
144 final byte [][] regionsToFlush = wal.rollWriter(periodic ||
145 entry.getValue().booleanValue());
146 walNeedsRoll.put(wal, Boolean.FALSE);
147 if (regionsToFlush != null) {
148 for (byte [] r: regionsToFlush) scheduleFlush(r);
149 }
150 }
151 } catch (FailedLogCloseException e) {
152 server.abort("Failed log close in log roller", e);
153 } catch (java.net.ConnectException e) {
154 server.abort("Failed log close in log roller", e);
155 } catch (IOException ex) {
156
157 server.abort("IOE in log roller",
158 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
159 } catch (Exception ex) {
160 LOG.error("Log rolling failed", ex);
161 server.abort("Log rolling failed", ex);
162 } finally {
163 try {
164 rollLog.set(false);
165 } finally {
166 rollLock.unlock();
167 }
168 }
169 }
170 LOG.info("LogRoller exiting.");
171 }
172
173
174
175
176 private void scheduleFlush(final byte [] encodedRegionName) {
177 boolean scheduled = false;
178 Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
179 FlushRequester requester = null;
180 if (r != null) {
181 requester = this.services.getFlushRequester();
182 if (requester != null) {
183
184 requester.requestFlush(r, true);
185 scheduled = true;
186 }
187 }
188 if (!scheduled) {
189 LOG.warn("Failed to schedule flush of " +
190 Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
191 requester);
192 }
193 }
194
195
196
197
198
199 @VisibleForTesting
200 public boolean walRollFinished() {
201 for (boolean needRoll : walNeedsRoll.values()) {
202 if (needRoll) {
203 return false;
204 }
205 }
206 return true;
207 }
208 }