View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.executor;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.lang.management.ThreadInfo;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
39  
40  import com.google.common.collect.Lists;
41  import com.google.common.collect.Maps;
42  import com.google.common.util.concurrent.ThreadFactoryBuilder;
43  
44  /**
45   * This is a generic executor service. This component abstracts a
46   * threadpool, a queue to which {@link EventType}s can be submitted,
47   * and a <code>Runnable</code> that handles the object that is added to the queue.
48   *
49   * <p>In order to create a new service, create an instance of this class and
50   * then do: <code>instance.startExecutorService("myService");</code>.  When done
51   * call {@link #shutdown()}.
52   *
53   * <p>In order to use the service created above, call
54   * {@link #submit(EventHandler)}.
55   */
56  @InterfaceAudience.Private
57  public class ExecutorService {
58    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
59  
60    // hold the all the executors created in a map addressable by their names
61    private final ConcurrentHashMap<String, Executor> executorMap =
62      new ConcurrentHashMap<String, Executor>();
63  
64    // Name of the server hosting this executor service.
65    private final String servername;
66  
67    /**
68     * Default constructor.
69     * @param servername Name of the hosting server.
70     */
71    public ExecutorService(final String servername) {
72      super();
73      this.servername = servername;
74    }
75  
76    /**
77     * Start an executor service with a given name. If there was a service already
78     * started with the same name, this throws a RuntimeException.
79     * @param name Name of the service to start.
80     */
81    void startExecutorService(String name, int maxThreads) {
82      if (this.executorMap.get(name) != null) {
83        throw new RuntimeException("An executor service with the name " + name +
84          " is already running!");
85      }
86      Executor hbes = new Executor(name, maxThreads);
87      if (this.executorMap.putIfAbsent(name, hbes) != null) {
88        throw new RuntimeException("An executor service with the name " + name +
89        " is already running (2)!");
90      }
91      LOG.debug("Starting executor service name=" + name +
92        ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
93        ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
94    }
95  
96    boolean isExecutorServiceRunning(String name) {
97      return this.executorMap.containsKey(name);
98    }
99  
100   public void shutdown() {
101     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
102       List<Runnable> wasRunning =
103         entry.getValue().threadPoolExecutor.shutdownNow();
104       if (!wasRunning.isEmpty()) {
105         LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
106       }
107     }
108     this.executorMap.clear();
109   }
110 
111   Executor getExecutor(final ExecutorType type) {
112     return getExecutor(type.getExecutorName(this.servername));
113   }
114 
115   Executor getExecutor(String name) {
116     Executor executor = this.executorMap.get(name);
117     return executor;
118   }
119 
120 
121   public void startExecutorService(final ExecutorType type, final int maxThreads) {
122     String name = type.getExecutorName(this.servername);
123     if (isExecutorServiceRunning(name)) {
124       LOG.debug("Executor service " + toString() + " already running on " +
125           this.servername);
126       return;
127     }
128     startExecutorService(name, maxThreads);
129   }
130 
131   public void submit(final EventHandler eh) {
132     Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
133     if (executor == null) {
134       // This happens only when events are submitted after shutdown() was
135       // called, so dropping them should be "ok" since it means we're
136       // shutting down.
137       LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
138         " Is this process shutting down?");
139     } else {
140       executor.submit(eh);
141     }
142   }
143 
144   public Map<String, ExecutorStatus> getAllExecutorStatuses() {
145     Map<String, ExecutorStatus> ret = Maps.newHashMap();
146     for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
147       ret.put(e.getKey(), e.getValue().getStatus());
148     }
149     return ret;
150   }
151   
152   /**
153    * Executor instance.
154    */
155   static class Executor {
156     // how long to retain excess threads
157     static final long keepAliveTimeInMillis = 1000;
158     // the thread pool executor that services the requests
159     final TrackingThreadPoolExecutor threadPoolExecutor;
160     // work queue to use - unbounded queue
161     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
162     private final String name;
163     private static final AtomicLong seqids = new AtomicLong(0);
164     private final long id;
165 
166     protected Executor(String name, int maxThreads) {
167       this.id = seqids.incrementAndGet();
168       this.name = name;
169       // create the thread pool executor
170       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
171           maxThreads, maxThreads,
172           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
173       // name the threads for this threadpool
174       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
175       tfb.setNameFormat(this.name + "-%d");
176       this.threadPoolExecutor.setThreadFactory(tfb.build());
177     }
178 
179     /**
180      * Submit the event to the queue for handling.
181      * @param event
182      */
183     void submit(final EventHandler event) {
184       // If there is a listener for this type, make sure we call the before
185       // and after process methods.
186       this.threadPoolExecutor.execute(event);
187     }
188     
189     public String toString() {
190       return getClass().getSimpleName() + "-" + id + "-" + name;
191     }
192 
193     public ExecutorStatus getStatus() {
194       List<EventHandler> queuedEvents = Lists.newArrayList();
195       for (Runnable r : q) {
196         if (!(r instanceof EventHandler)) {
197           LOG.warn("Non-EventHandler " + r + " queued in " + name);
198           continue;
199         }
200         queuedEvents.add((EventHandler)r);
201       }
202       
203       List<RunningEventStatus> running = Lists.newArrayList();
204       for (Map.Entry<Thread, Runnable> e :
205           threadPoolExecutor.getRunningTasks().entrySet()) {
206         Runnable r = e.getValue();
207         if (!(r instanceof EventHandler)) {
208           LOG.warn("Non-EventHandler " + r + " running in " + name);
209           continue;
210         }
211         running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
212       }
213       
214       return new ExecutorStatus(this, queuedEvents, running);
215     }
216   }
217  
218   /**
219    * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
220    * are executing at any given point in time.
221    */
222   static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
223     private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 
224       
225     public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
226         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
227       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
228     }
229 
230     @Override
231     protected void afterExecute(Runnable r, Throwable t) {
232       super.afterExecute(r, t);
233       running.remove(Thread.currentThread());
234     }
235 
236     @Override
237     protected void beforeExecute(Thread t, Runnable r) {
238       Runnable oldPut = running.put(t, r);
239       assert oldPut == null : "inconsistency for thread " + t;
240       super.beforeExecute(t, r);
241     }
242    
243     /**
244      * @return a map of the threads currently running tasks
245      * inside this executor. Each key is an active thread,
246      * and the value is the task that is currently running.
247      * Note that this is not a stable snapshot of the map.
248      */
249     public ConcurrentMap<Thread, Runnable> getRunningTasks() {
250       return running;
251     }
252   }
253 
254   /**
255    * A snapshot of the status of a particular executor. This includes
256    * the contents of the executor's pending queue, as well as the
257    * threads and events currently being processed.
258    *
259    * This is a consistent snapshot that is immutable once constructed.
260    */
261   public static class ExecutorStatus {
262     final Executor executor;
263     final List<EventHandler> queuedEvents;
264     final List<RunningEventStatus> running;
265 
266     ExecutorStatus(Executor executor,
267         List<EventHandler> queuedEvents,
268         List<RunningEventStatus> running) {
269       this.executor = executor;
270       this.queuedEvents = queuedEvents;
271       this.running = running;
272     }
273    
274     /**
275      * Dump a textual representation of the executor's status
276      * to the given writer.
277      *
278      * @param out the stream to write to
279      * @param indent a string prefix for each line, used for indentation
280      */
281     public void dumpTo(Writer out, String indent) throws IOException {
282       out.write(indent + "Status for executor: " + executor + "\n");
283       out.write(indent + "=======================================\n");
284       out.write(indent + queuedEvents.size() + " events queued, " +
285           running.size() + " running\n");
286       if (!queuedEvents.isEmpty()) {
287         out.write(indent + "Queued:\n");
288         for (EventHandler e : queuedEvents) {
289           out.write(indent + "  " + e + "\n");
290         }
291         out.write("\n");
292       }
293       if (!running.isEmpty()) {
294         out.write(indent + "Running:\n");
295         for (RunningEventStatus stat : running) {
296           out.write(indent + "  Running on thread '" +
297               stat.threadInfo.getThreadName() +
298               "': " + stat.event + "\n");
299           out.write(ThreadMonitoring.formatThreadInfo(
300               stat.threadInfo, indent + "  "));
301           out.write("\n");
302         }
303       }
304       out.flush();
305     }
306   }
307 
308   /**
309    * The status of a particular event that is in the middle of being
310    * handled by an executor.
311    */
312   public static class RunningEventStatus {
313     final ThreadInfo threadInfo;
314     final EventHandler event;
315 
316     public RunningEventStatus(Thread t, EventHandler event) {
317       this.threadInfo = ThreadMonitoring.getThreadInfo(t);
318       this.event = event;
319     }
320   }
321 }