1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.net.Socket;
33 import java.nio.channels.Channel;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.ClosedSelectorException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.Selector;
39 import java.util.ArrayList;
40 import java.util.Date;
41 import java.util.List;
42 import java.util.concurrent.ThreadFactory;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 import org.apache.http.nio.params.NIOReactorPNames;
46 import org.apache.http.nio.reactor.IOEventDispatch;
47 import org.apache.http.nio.reactor.IOReactor;
48 import org.apache.http.nio.reactor.IOReactorException;
49 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
50 import org.apache.http.nio.reactor.IOReactorStatus;
51 import org.apache.http.params.BasicHttpParams;
52 import org.apache.http.params.CoreConnectionPNames;
53 import org.apache.http.params.HttpParams;
54 import org.apache.http.util.Args;
55 import org.apache.http.util.Asserts;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @SuppressWarnings("deprecation")
98 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
99
100 protected volatile IOReactorStatus status;
101
102
103
104
105 @Deprecated
106 protected final HttpParams params;
107 protected final IOReactorConfig config;
108 protected final Selector selector;
109 protected final long selectTimeout;
110 protected final boolean interestOpsQueueing;
111
112 private final int workerCount;
113 private final ThreadFactory threadFactory;
114 private final BaseIOReactor[] dispatchers;
115 private final Worker[] workers;
116 private final Thread[] threads;
117 private final Object statusLock;
118
119
120 protected IOReactorExceptionHandler exceptionHandler;
121 protected List<ExceptionEvent> auditLog;
122
123 private int currentWorker = 0;
124
125
126
127
128
129
130
131
132
133
134
135 public AbstractMultiworkerIOReactor(
136 final IOReactorConfig config,
137 final ThreadFactory threadFactory) throws IOReactorException {
138 super();
139 this.config = config != null ? config : IOReactorConfig.DEFAULT;
140 this.params = new BasicHttpParams();
141 try {
142 this.selector = Selector.open();
143 } catch (final IOException ex) {
144 throw new IOReactorException("Failure opening selector", ex);
145 }
146 this.selectTimeout = this.config.getSelectInterval();
147 this.interestOpsQueueing = this.config.isInterestOpQueued();
148 this.statusLock = new Object();
149 if (threadFactory != null) {
150 this.threadFactory = threadFactory;
151 } else {
152 this.threadFactory = new DefaultThreadFactory();
153 }
154 this.auditLog = new ArrayList<ExceptionEvent>();
155 this.workerCount = this.config.getIoThreadCount();
156 this.dispatchers = new BaseIOReactor[workerCount];
157 this.workers = new Worker[workerCount];
158 this.threads = new Thread[workerCount];
159 this.status = IOReactorStatus.INACTIVE;
160 }
161
162
163
164
165
166
167
168
169 public AbstractMultiworkerIOReactor() throws IOReactorException {
170 this(null, null);
171 }
172
173
174
175
176 @Deprecated
177 static IOReactorConfig convert(final int workerCount, final HttpParams params) {
178 Args.notNull(params, "HTTP parameters");
179 return IOReactorConfig.custom()
180 .setSelectInterval(params.getLongParameter(NIOReactorPNames.SELECT_INTERVAL, 1000))
181 .setShutdownGracePeriod(params.getLongParameter(NIOReactorPNames.GRACE_PERIOD, 500))
182 .setInterestOpQueued(params.getBooleanParameter(NIOReactorPNames.SELECT_INTERVAL, false))
183 .setIoThreadCount(workerCount)
184 .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
185 .setConnectTimeout(params.getIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 0))
186 .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
187 .setSoReuseAddress(params.getBooleanParameter(CoreConnectionPNames.SO_REUSEADDR, false))
188 .setSoKeepAlive(params.getBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE, false))
189 .setSoLinger(params.getIntParameter(CoreConnectionPNames.SO_LINGER, -1))
190 .setTcpNoDelay(params.getBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true))
191 .build();
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205 @Deprecated
206 public AbstractMultiworkerIOReactor(
207 final int workerCount,
208 final ThreadFactory threadFactory,
209 final HttpParams params) throws IOReactorException {
210 this(convert(workerCount, params), threadFactory);
211 }
212
213 @Override
214 public IOReactorStatus getStatus() {
215 return this.status;
216 }
217
218
219
220
221
222
223
224 public List<ExceptionEvent> getAuditLog() {
225 synchronized (this.auditLog) {
226 return new ArrayList<ExceptionEvent>(this.auditLog);
227 }
228 }
229
230
231
232
233
234
235
236
237
238 protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
239 if (ex == null) {
240 return;
241 }
242 synchronized (this.auditLog) {
243 this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date()));
244 }
245 }
246
247
248
249
250
251
252 protected void addExceptionEvent(final Throwable ex) {
253 addExceptionEvent(ex, null);
254 }
255
256
257
258
259
260
261 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
262 this.exceptionHandler = exceptionHandler;
263 }
264
265
266
267
268
269
270
271
272
273 protected abstract void processEvents(int count) throws IOReactorException;
274
275
276
277
278
279
280
281
282 protected abstract void cancelRequests() throws IOReactorException;
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306 @Override
307 public void execute(
308 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
309 Args.notNull(eventDispatch, "Event dispatcher");
310 synchronized (this.statusLock) {
311 if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
312 this.status = IOReactorStatus.SHUT_DOWN;
313 this.statusLock.notifyAll();
314 return;
315 }
316 Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
317 "Illegal state %s", this.status);
318 this.status = IOReactorStatus.ACTIVE;
319
320 for (int i = 0; i < this.dispatchers.length; i++) {
321 final BaseIOReactorBaseIOReactor.html#BaseIOReactor">BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
322 dispatcher.setExceptionHandler(exceptionHandler);
323 this.dispatchers[i] = dispatcher;
324 }
325 for (int i = 0; i < this.workerCount; i++) {
326 final BaseIOReactor dispatcher = this.dispatchers[i];
327 this.workers[i] = new Worker(dispatcher, eventDispatch);
328 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
329 }
330 }
331 try {
332
333 for (int i = 0; i < this.workerCount; i++) {
334 if (this.status != IOReactorStatus.ACTIVE) {
335 return;
336 }
337 this.threads[i].start();
338 }
339
340 for (;;) {
341 final int readyCount;
342 try {
343 readyCount = this.selector.select(this.selectTimeout);
344 } catch (final InterruptedIOException ex) {
345 throw ex;
346 } catch (final IOException ex) {
347 throw new IOReactorException("Unexpected selector failure", ex);
348 }
349
350 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
351 processEvents(readyCount);
352 }
353
354
355 for (int i = 0; i < this.workerCount; i++) {
356 final Worker worker = this.workers[i];
357 final Throwable ex = worker.getThrowable();
358 if (ex != null) {
359 throw new IOReactorException(
360 "I/O dispatch worker terminated abnormally", ex);
361 }
362 }
363
364 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
365 break;
366 }
367 }
368
369 } catch (final ClosedSelectorException ex) {
370 addExceptionEvent(ex);
371 } catch (final IOReactorException ex) {
372 if (ex.getCause() != null) {
373 addExceptionEvent(ex.getCause());
374 }
375 throw ex;
376 } finally {
377 doShutdown();
378 synchronized (this.statusLock) {
379 this.status = IOReactorStatus.SHUT_DOWN;
380 this.statusLock.notifyAll();
381 }
382 }
383 }
384
385
386
387
388
389
390
391
392
393
394
395 protected void doShutdown() throws InterruptedIOException {
396 synchronized (this.statusLock) {
397 if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
398 return;
399 }
400 this.status = IOReactorStatus.SHUTTING_DOWN;
401 }
402 try {
403 cancelRequests();
404 } catch (final IOReactorException ex) {
405 if (ex.getCause() != null) {
406 addExceptionEvent(ex.getCause());
407 }
408 }
409 this.selector.wakeup();
410
411
412 if (this.selector.isOpen()) {
413 for (final SelectionKey key : this.selector.keys()) {
414 try {
415 final Channel channel = key.channel();
416 if (channel != null) {
417 channel.close();
418 }
419 } catch (final IOException ex) {
420 addExceptionEvent(ex);
421 }
422 }
423
424 try {
425 this.selector.close();
426 } catch (final IOException ex) {
427 addExceptionEvent(ex);
428 }
429 }
430
431
432 for (int i = 0; i < this.workerCount; i++) {
433 final BaseIOReactor dispatcher = this.dispatchers[i];
434 dispatcher.gracefulShutdown();
435 }
436
437 final long gracePeriod = this.config.getShutdownGracePeriod();
438
439 try {
440
441
442 for (int i = 0; i < this.workerCount; i++) {
443 final BaseIOReactor dispatcher = this.dispatchers[i];
444 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
445 dispatcher.awaitShutdown(gracePeriod);
446 }
447 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
448 try {
449 dispatcher.hardShutdown();
450 } catch (final IOReactorException ex) {
451 if (ex.getCause() != null) {
452 addExceptionEvent(ex.getCause());
453 }
454 }
455 }
456 }
457
458 for (int i = 0; i < this.workerCount; i++) {
459 final Thread t = this.threads[i];
460 if (t != null) {
461 t.join(gracePeriod);
462 }
463 }
464 } catch (final InterruptedException ex) {
465 throw new InterruptedIOException(ex.getMessage());
466 }
467 }
468
469
470
471
472
473
474 protected void addChannel(final ChannelEntry entry) {
475
476 final int i = Math.abs(this.currentWorker++ % this.workerCount);
477 this.dispatchers[i].addChannel(entry);
478 }
479
480
481
482
483
484
485
486
487
488 protected SelectionKey registerChannel(
489 final SelectableChannel channel, final int ops) throws ClosedChannelException {
490 return channel.register(this.selector, ops);
491 }
492
493
494
495
496
497
498
499 protected void prepareSocket(final Socket socket) throws IOException {
500 socket.setTcpNoDelay(this.config.isTcpNoDelay());
501 socket.setKeepAlive(this.config.isSoKeepalive());
502 if (this.config.getSoTimeout() > 0) {
503 socket.setSoTimeout(this.config.getSoTimeout());
504 }
505 if (this.config.getSndBufSize() > 0) {
506 socket.setSendBufferSize(this.config.getSndBufSize());
507 }
508 if (this.config.getRcvBufSize() > 0) {
509 socket.setReceiveBufferSize(this.config.getRcvBufSize());
510 }
511 final int linger = this.config.getSoLinger();
512 if (linger >= 0) {
513 socket.setSoLinger(true, linger);
514 }
515 }
516
517
518
519
520
521
522
523
524
525
526 protected void awaitShutdown(final long timeout) throws InterruptedException {
527 synchronized (this.statusLock) {
528 final long deadline = System.currentTimeMillis() + timeout;
529 long remaining = timeout;
530 while (this.status != IOReactorStatus.SHUT_DOWN) {
531 this.statusLock.wait(remaining);
532 if (timeout > 0) {
533 remaining = deadline - System.currentTimeMillis();
534 if (remaining <= 0) {
535 break;
536 }
537 }
538 }
539 }
540 }
541
542 @Override
543 public void shutdown() throws IOException {
544 shutdown(2000);
545 }
546
547 @Override
548 public void shutdown(final long waitMs) throws IOException {
549 synchronized (this.statusLock) {
550 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
551 return;
552 }
553 if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
554 this.status = IOReactorStatus.SHUT_DOWN;
555 cancelRequests();
556 this.selector.close();
557 return;
558 }
559 this.status = IOReactorStatus.SHUTDOWN_REQUEST;
560 }
561 this.selector.wakeup();
562 try {
563 awaitShutdown(waitMs);
564 } catch (final InterruptedException ignore) {
565 }
566 }
567
568 static void closeChannel(final Channel channel) {
569 try {
570 channel.close();
571 } catch (final IOException ignore) {
572 }
573 }
574
575 static class Worker implements Runnable {
576
577 final BaseIOReactor dispatcher;
578 final IOEventDispatch eventDispatch;
579
580 private volatile Throwable exception;
581
582 public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
583 super();
584 this.dispatcher = dispatcher;
585 this.eventDispatch = eventDispatch;
586 }
587
588 @Override
589 public void run() {
590 try {
591 this.dispatcher.execute(this.eventDispatch);
592 } catch (final Error ex) {
593 this.exception = ex;
594 throw ex;
595 } catch (final Exception ex) {
596 this.exception = ex;
597 }
598 }
599
600 public Throwable getThrowable() {
601 return this.exception;
602 }
603
604 }
605
606 static class DefaultThreadFactory implements ThreadFactory {
607
608 private final static AtomicLong COUNT = new AtomicLong(1);
609
610 @Override
611 public Thread newThread(final Runnable r) {
612 return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
613 }
614
615 }
616
617 }