View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Map.Entry;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.Server;
32  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
33  import org.apache.hadoop.hbase.wal.WAL;
34  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.HasThread;
37  import org.apache.hadoop.ipc.RemoteException;
38  
39  import com.google.common.annotations.VisibleForTesting;
40  
41  /**
42   * Runs periodically to determine if the WAL should be rolled.
43   *
44   * NOTE: This class extends Thread rather than Chore because the sleep time
45   * can be interrupted when there is something to do, rather than the Chore
46   * sleep time which is invariant.
47   *
48   * TODO: change to a pool of threads
49   */
50  @InterfaceAudience.Private
51  @VisibleForTesting
52  public class LogRoller extends HasThread {
53    private static final Log LOG = LogFactory.getLog(LogRoller.class);
54    private final ReentrantLock rollLock = new ReentrantLock();
55    private final AtomicBoolean rollLog = new AtomicBoolean(false);
56    private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
57        new ConcurrentHashMap<WAL, Boolean>();
58    private final Server server;
59    protected final RegionServerServices services;
60    private volatile long lastrolltime = System.currentTimeMillis();
61    // Period to roll log.
62    private final long rollperiod;
63    private final int threadWakeFrequency;
64  
65    public void addWAL(final WAL wal) {
66      if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
67        wal.registerWALActionsListener(new WALActionsListener.Base() {
68          @Override
69          public void logRollRequested(boolean lowReplicas) {
70            walNeedsRoll.put(wal, Boolean.TRUE);
71            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
72            synchronized(rollLog) {
73              rollLog.set(true);
74              rollLog.notifyAll();
75            }
76          }
77        });
78      }
79    }
80  
81    public void requestRollAll() {
82      for (WAL wal : walNeedsRoll.keySet()) {
83        walNeedsRoll.put(wal, Boolean.TRUE);
84      }
85      synchronized(rollLog) {
86        rollLog.set(true);
87        rollLog.notifyAll();
88      }
89    }
90  
91    /** @param server */
92    public LogRoller(final Server server, final RegionServerServices services) {
93      super();
94      this.server = server;
95      this.services = services;
96      this.rollperiod = this.server.getConfiguration().
97        getLong("hbase.regionserver.logroll.period", 3600000);
98      this.threadWakeFrequency = this.server.getConfiguration().
99        getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
100   }
101 
102   @Override
103   public void interrupt() {
104     // Wake up if we are waiting on rollLog. For tests.
105     synchronized (rollLog) {
106       this.rollLog.notify();
107     }
108     super.interrupt();
109   }
110 
111   @Override
112   public void run() {
113     while (!server.isStopped()) {
114       long now = System.currentTimeMillis();
115       boolean periodic = false;
116       if (!rollLog.get()) {
117         periodic = (now - this.lastrolltime) > this.rollperiod;
118         if (!periodic) {
119           synchronized (rollLog) {
120             try {
121               if (!rollLog.get()) {
122                 rollLog.wait(this.threadWakeFrequency);
123               }
124             } catch (InterruptedException e) {
125               // Fall through
126             }
127           }
128           continue;
129         }
130         // Time for periodic roll
131         if (LOG.isDebugEnabled()) {
132           LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
133         }
134       } else if (LOG.isDebugEnabled()) {
135         LOG.debug("WAL roll requested");
136       }
137       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
138       try {
139         this.lastrolltime = now;
140         for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
141           final WAL wal = entry.getKey();
142           // Force the roll if the logroll.period is elapsed or if a roll was requested.
143           // The returned value is an array of actual region names.
144           final byte [][] regionsToFlush = wal.rollWriter(periodic ||
145               entry.getValue().booleanValue());
146           walNeedsRoll.put(wal, Boolean.FALSE);
147           if (regionsToFlush != null) {
148             for (byte [] r: regionsToFlush) scheduleFlush(r);
149           }
150         }
151       } catch (FailedLogCloseException e) {
152         server.abort("Failed log close in log roller", e);
153       } catch (java.net.ConnectException e) {
154         server.abort("Failed log close in log roller", e);
155       } catch (IOException ex) {
156         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
157         server.abort("IOE in log roller",
158           ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
159       } catch (Exception ex) {
160         LOG.error("Log rolling failed", ex);
161         server.abort("Log rolling failed", ex);
162       } finally {
163         try {
164           rollLog.set(false);
165         } finally {
166           rollLock.unlock();
167         }
168       }
169     }
170     LOG.info("LogRoller exiting.");
171   }
172 
173   /**
174    * @param encodedRegionName Encoded name of region to flush.
175    */
176   private void scheduleFlush(final byte [] encodedRegionName) {
177     boolean scheduled = false;
178     Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
179     FlushRequester requester = null;
180     if (r != null) {
181       requester = this.services.getFlushRequester();
182       if (requester != null) {
183         // force flushing all stores to clean old logs
184         requester.requestFlush(r, true);
185         scheduled = true;
186       }
187     }
188     if (!scheduled) {
189       LOG.warn("Failed to schedule flush of " +
190         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
191         requester);
192     }
193   }
194 
195   /**
196    * For testing only
197    * @return true if all WAL roll finished
198    */
199   @VisibleForTesting
200   public boolean walRollFinished() {
201     for (boolean needRoll : walNeedsRoll.values()) {
202       if (needRoll) {
203         return false;
204       }
205     }
206     return true;
207   }
208 }