1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.executor;
20
21 import java.io.IOException;
22 import java.io.Writer;
23 import java.lang.management.ThreadInfo;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
39
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Maps;
42 import com.google.common.util.concurrent.ThreadFactoryBuilder;
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class ExecutorService {
58 private static final Log LOG = LogFactory.getLog(ExecutorService.class);
59
60
61 private final ConcurrentHashMap<String, Executor> executorMap =
62 new ConcurrentHashMap<String, Executor>();
63
64
65 private final String servername;
66
67
68
69
70
71 public ExecutorService(final String servername) {
72 super();
73 this.servername = servername;
74 }
75
76
77
78
79
80
81 void startExecutorService(String name, int maxThreads) {
82 if (this.executorMap.get(name) != null) {
83 throw new RuntimeException("An executor service with the name " + name +
84 " is already running!");
85 }
86 Executor hbes = new Executor(name, maxThreads);
87 if (this.executorMap.putIfAbsent(name, hbes) != null) {
88 throw new RuntimeException("An executor service with the name " + name +
89 " is already running (2)!");
90 }
91 LOG.debug("Starting executor service name=" + name +
92 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
93 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
94 }
95
96 boolean isExecutorServiceRunning(String name) {
97 return this.executorMap.containsKey(name);
98 }
99
100 public void shutdown() {
101 for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
102 List<Runnable> wasRunning =
103 entry.getValue().threadPoolExecutor.shutdownNow();
104 if (!wasRunning.isEmpty()) {
105 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
106 }
107 }
108 this.executorMap.clear();
109 }
110
111 Executor getExecutor(final ExecutorType type) {
112 return getExecutor(type.getExecutorName(this.servername));
113 }
114
115 Executor getExecutor(String name) {
116 Executor executor = this.executorMap.get(name);
117 return executor;
118 }
119
120
121 public void startExecutorService(final ExecutorType type, final int maxThreads) {
122 String name = type.getExecutorName(this.servername);
123 if (isExecutorServiceRunning(name)) {
124 LOG.debug("Executor service " + toString() + " already running on " +
125 this.servername);
126 return;
127 }
128 startExecutorService(name, maxThreads);
129 }
130
131 public void submit(final EventHandler eh) {
132 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
133 if (executor == null) {
134
135
136
137 LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
138 " Is this process shutting down?");
139 } else {
140 executor.submit(eh);
141 }
142 }
143
144 public Map<String, ExecutorStatus> getAllExecutorStatuses() {
145 Map<String, ExecutorStatus> ret = Maps.newHashMap();
146 for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
147 ret.put(e.getKey(), e.getValue().getStatus());
148 }
149 return ret;
150 }
151
152
153
154
155 static class Executor {
156
157 static final long keepAliveTimeInMillis = 1000;
158
159 final TrackingThreadPoolExecutor threadPoolExecutor;
160
161 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
162 private final String name;
163 private static final AtomicLong seqids = new AtomicLong(0);
164 private final long id;
165
166 protected Executor(String name, int maxThreads) {
167 this.id = seqids.incrementAndGet();
168 this.name = name;
169
170 this.threadPoolExecutor = new TrackingThreadPoolExecutor(
171 maxThreads, maxThreads,
172 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
173
174 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
175 tfb.setNameFormat(this.name + "-%d");
176 this.threadPoolExecutor.setThreadFactory(tfb.build());
177 }
178
179
180
181
182
183 void submit(final EventHandler event) {
184
185
186 this.threadPoolExecutor.execute(event);
187 }
188
189 public String toString() {
190 return getClass().getSimpleName() + "-" + id + "-" + name;
191 }
192
193 public ExecutorStatus getStatus() {
194 List<EventHandler> queuedEvents = Lists.newArrayList();
195 for (Runnable r : q) {
196 if (!(r instanceof EventHandler)) {
197 LOG.warn("Non-EventHandler " + r + " queued in " + name);
198 continue;
199 }
200 queuedEvents.add((EventHandler)r);
201 }
202
203 List<RunningEventStatus> running = Lists.newArrayList();
204 for (Map.Entry<Thread, Runnable> e :
205 threadPoolExecutor.getRunningTasks().entrySet()) {
206 Runnable r = e.getValue();
207 if (!(r instanceof EventHandler)) {
208 LOG.warn("Non-EventHandler " + r + " running in " + name);
209 continue;
210 }
211 running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
212 }
213
214 return new ExecutorStatus(this, queuedEvents, running);
215 }
216 }
217
218
219
220
221
222 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
223 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
224
225 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
226 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
227 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
228 }
229
230 @Override
231 protected void afterExecute(Runnable r, Throwable t) {
232 super.afterExecute(r, t);
233 running.remove(Thread.currentThread());
234 }
235
236 @Override
237 protected void beforeExecute(Thread t, Runnable r) {
238 Runnable oldPut = running.put(t, r);
239 assert oldPut == null : "inconsistency for thread " + t;
240 super.beforeExecute(t, r);
241 }
242
243
244
245
246
247
248
249 public ConcurrentMap<Thread, Runnable> getRunningTasks() {
250 return running;
251 }
252 }
253
254
255
256
257
258
259
260
261 public static class ExecutorStatus {
262 final Executor executor;
263 final List<EventHandler> queuedEvents;
264 final List<RunningEventStatus> running;
265
266 ExecutorStatus(Executor executor,
267 List<EventHandler> queuedEvents,
268 List<RunningEventStatus> running) {
269 this.executor = executor;
270 this.queuedEvents = queuedEvents;
271 this.running = running;
272 }
273
274
275
276
277
278
279
280
281 public void dumpTo(Writer out, String indent) throws IOException {
282 out.write(indent + "Status for executor: " + executor + "\n");
283 out.write(indent + "=======================================\n");
284 out.write(indent + queuedEvents.size() + " events queued, " +
285 running.size() + " running\n");
286 if (!queuedEvents.isEmpty()) {
287 out.write(indent + "Queued:\n");
288 for (EventHandler e : queuedEvents) {
289 out.write(indent + " " + e + "\n");
290 }
291 out.write("\n");
292 }
293 if (!running.isEmpty()) {
294 out.write(indent + "Running:\n");
295 for (RunningEventStatus stat : running) {
296 out.write(indent + " Running on thread '" +
297 stat.threadInfo.getThreadName() +
298 "': " + stat.event + "\n");
299 out.write(ThreadMonitoring.formatThreadInfo(
300 stat.threadInfo, indent + " "));
301 out.write("\n");
302 }
303 }
304 out.flush();
305 }
306 }
307
308
309
310
311
312 public static class RunningEventStatus {
313 final ThreadInfo threadInfo;
314 final EventHandler event;
315
316 public RunningEventStatus(Thread t, EventHandler event) {
317 this.threadInfo = ThreadMonitoring.getThreadInfo(t);
318 this.event = event;
319 }
320 }
321 }