View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.net.Socket;
33  import java.nio.channels.Channel;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.ClosedSelectorException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.util.ArrayList;
40  import java.util.Date;
41  import java.util.List;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import org.apache.http.nio.params.NIOReactorPNames;
46  import org.apache.http.nio.reactor.IOEventDispatch;
47  import org.apache.http.nio.reactor.IOReactor;
48  import org.apache.http.nio.reactor.IOReactorException;
49  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
50  import org.apache.http.nio.reactor.IOReactorStatus;
51  import org.apache.http.params.BasicHttpParams;
52  import org.apache.http.params.CoreConnectionPNames;
53  import org.apache.http.params.HttpParams;
54  import org.apache.http.util.Args;
55  import org.apache.http.util.Asserts;
56  
57  /**
58   * Generic implementation of {@link IOReactor} that can run multiple
59   * {@link BaseIOReactor} instances in separate worker threads and distribute
60   * newly created I/O sessions equally across those I/O reactors for more
61   * optimal resource utilization and better I/O performance. Usually it is
62   * recommended to have one worker I/O reactor per physical CPU core.
63   * <p>
64   * <strong>Important note about exception handling</strong>
65   * <p>
66   * Protocol specific exceptions as well as those I/O exceptions thrown in the
67   * course of interaction with the session's channel are to be expected are to be
68   * dealt with by specific protocol handlers. These exceptions may result in
69   * termination of an individual session but should not affect the I/O reactor
70   * and all other active sessions. There are situations, however, when the I/O
71   * reactor itself encounters an internal problem such as an I/O exception in
72   * the underlying NIO classes or an unhandled runtime exception. Those types of
73   * exceptions are usually fatal and will cause the I/O reactor to shut down
74   * automatically.
75   * <p>
76   * There is a possibility to override this behavior and prevent I/O reactors
77   * from shutting down automatically in case of a runtime exception or an I/O
78   * exception in internal classes. This can be accomplished by providing a custom
79   * implementation of the {@link IOReactorExceptionHandler} interface.
80   * <p>
81   * If an I/O reactor is unable to automatically recover from an I/O or a runtime
82   * exception it will enter the shutdown mode. First off, it cancel all pending
83   * new session requests. Then it will attempt to close all active I/O sessions
84   * gracefully giving them some time to flush pending output data and terminate
85   * cleanly. Lastly, it will forcibly shut down those I/O sessions that still
86   * remain active after the grace period. This is a fairly complex process, where
87   * many things can fail at the same time and many different exceptions can be
88   * thrown in the course of the shutdown process. The I/O reactor will record all
89   * exceptions thrown during the shutdown process, including the original one
90   * that actually caused the shutdown in the first place, in an audit log. One
91   * can obtain the audit log using {@link #getAuditLog()}, examine exceptions
92   * thrown by the I/O reactor prior and in the course of the reactor shutdown
93   * and decide whether it is safe to restart the I/O reactor.
94   *
95   * @since 4.0
96   */
97  @SuppressWarnings("deprecation")
98  public abstract class AbstractMultiworkerIOReactor implements IOReactor {
99  
100     protected volatile IOReactorStatus status;
101 
102     /**
103      * @deprecated (4.2)
104      */
105     @Deprecated
106     protected final HttpParams params;
107     protected final IOReactorConfig config;
108     protected final Selector selector;
109     protected final long selectTimeout;
110     protected final boolean interestOpsQueueing;
111 
112     private final int workerCount;
113     private final ThreadFactory threadFactory;
114     private final BaseIOReactor[] dispatchers;
115     private final Worker[] workers;
116     private final Thread[] threads;
117     private final Object statusLock;
118 
119     //TODO: make final
120     protected IOReactorExceptionHandler exceptionHandler;
121     protected List<ExceptionEvent> auditLog;
122 
123     private int currentWorker = 0;
124 
125     /**
126      * Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
127      *
128      * @param config I/O reactor configuration.
129      * @param threadFactory the factory to create threads.
130      *   Can be {@code null}.
131      * @throws IOReactorException in case if a non-recoverable I/O error.
132      *
133      * @since 4.2
134      */
135     public AbstractMultiworkerIOReactor(
136             final IOReactorConfig config,
137             final ThreadFactory threadFactory) throws IOReactorException {
138         super();
139         this.config = config != null ? config : IOReactorConfig.DEFAULT;
140         this.params = new BasicHttpParams();
141         try {
142             this.selector = Selector.open();
143         } catch (final IOException ex) {
144             throw new IOReactorException("Failure opening selector", ex);
145         }
146         this.selectTimeout = this.config.getSelectInterval();
147         this.interestOpsQueueing = this.config.isInterestOpQueued();
148         this.statusLock = new Object();
149         if (threadFactory != null) {
150             this.threadFactory = threadFactory;
151         } else {
152             this.threadFactory = new DefaultThreadFactory();
153         }
154         this.auditLog = new ArrayList<ExceptionEvent>();
155         this.workerCount = this.config.getIoThreadCount();
156         this.dispatchers = new BaseIOReactor[workerCount];
157         this.workers = new Worker[workerCount];
158         this.threads = new Thread[workerCount];
159         this.status = IOReactorStatus.INACTIVE;
160     }
161 
162     /**
163      * Creates an instance of AbstractMultiworkerIOReactor with default configuration.
164      *
165      * @throws IOReactorException in case if a non-recoverable I/O error.
166      *
167      * @since 4.2
168      */
169     public AbstractMultiworkerIOReactor() throws IOReactorException {
170         this(null, null);
171     }
172 
173     /**
174      * @deprecated Do not use.
175      */
176     @Deprecated
177     static IOReactorConfig convert(final int workerCount, final HttpParams params) {
178         Args.notNull(params, "HTTP parameters");
179         return IOReactorConfig.custom()
180             .setSelectInterval(params.getLongParameter(NIOReactorPNames.SELECT_INTERVAL, 1000))
181             .setShutdownGracePeriod(params.getLongParameter(NIOReactorPNames.GRACE_PERIOD, 500))
182             .setInterestOpQueued(params.getBooleanParameter(NIOReactorPNames.SELECT_INTERVAL, false))
183             .setIoThreadCount(workerCount)
184             .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
185             .setConnectTimeout(params.getIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 0))
186             .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
187             .setSoReuseAddress(params.getBooleanParameter(CoreConnectionPNames.SO_REUSEADDR, false))
188             .setSoKeepAlive(params.getBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE, false))
189             .setSoLinger(params.getIntParameter(CoreConnectionPNames.SO_LINGER, -1))
190             .setTcpNoDelay(params.getBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true))
191             .build();
192     }
193 
194     /**
195      * Creates an instance of AbstractMultiworkerIOReactor.
196      *
197      * @param workerCount number of worker I/O reactors.
198      * @param threadFactory the factory to create threads.
199      *   Can be {@code null}.
200      * @param params HTTP parameters.
201      * @throws IOReactorException in case if a non-recoverable I/O error.
202      *
203      * @deprecated (4.2) use {@link AbstractMultiworkerIOReactor#AbstractMultiworkerIOReactor(IOReactorConfig, ThreadFactory)}
204      */
205     @Deprecated
206     public AbstractMultiworkerIOReactor(
207             final int workerCount,
208             final ThreadFactory threadFactory,
209             final HttpParams params) throws IOReactorException {
210         this(convert(workerCount, params), threadFactory);
211     }
212 
213     @Override
214     public IOReactorStatus getStatus() {
215         return this.status;
216     }
217 
218     /**
219      * Returns the audit log containing exceptions thrown by the I/O reactor
220      * prior and in the course of the reactor shutdown.
221      *
222      * @return audit log.
223      */
224     public List<ExceptionEvent> getAuditLog() {
225         synchronized (this.auditLog) {
226             return new ArrayList<ExceptionEvent>(this.auditLog);
227         }
228     }
229 
230     /**
231      * Adds the given {@link Throwable} object with the given time stamp
232      * to the audit log.
233      *
234      * @param ex the exception thrown by the I/O reactor.
235      * @param timestamp the time stamp of the exception. Can be
236      * {@code null} in which case the current date / time will be used.
237      */
238     protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
239         if (ex == null) {
240             return;
241         }
242         synchronized (this.auditLog) {
243             this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date()));
244         }
245     }
246 
247     /**
248      * Adds the given {@link Throwable} object to the audit log.
249      *
250      * @param ex the exception thrown by the I/O reactor.
251      */
252     protected void addExceptionEvent(final Throwable ex) {
253         addExceptionEvent(ex, null);
254     }
255 
256     /**
257      * Sets exception handler for this I/O reactor.
258      *
259      * @param exceptionHandler the exception handler.
260      */
261     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
262         this.exceptionHandler = exceptionHandler;
263     }
264 
265     /**
266      * Triggered to process I/O events registered by the main {@link Selector}.
267      * <p>
268      * Super-classes can implement this method to react to the event.
269      *
270      * @param count event count.
271      * @throws IOReactorException in case if a non-recoverable I/O error.
272      */
273     protected abstract void processEvents(int count) throws IOReactorException;
274 
275     /**
276      * Triggered to cancel pending session requests.
277      * <p>
278      * Super-classes can implement this method to react to the event.
279      *
280      * @throws IOReactorException in case if a non-recoverable I/O error.
281      */
282     protected abstract void cancelRequests() throws IOReactorException;
283 
284     /**
285      * Activates the main I/O reactor as well as all worker I/O reactors.
286      * The I/O main reactor will start reacting to I/O events and triggering
287      * notification methods. The worker I/O reactor in their turn will start
288      * reacting to I/O events and dispatch I/O event notifications to the given
289      * {@link IOEventDispatch} interface.
290      * <p>
291      * This method will enter the infinite I/O select loop on
292      * the {@link Selector} instance associated with this I/O reactor and used
293      * to manage creation of new I/O channels. Once a new I/O channel has been
294      * created the processing of I/O events on that channel will be delegated
295      * to one of the worker I/O reactors.
296      * <p>
297      * The method will remain blocked unto the I/O reactor is shut down or the
298      * execution thread is interrupted.
299      *
300      * @see #processEvents(int)
301      * @see #cancelRequests()
302      *
303      * @throws InterruptedIOException if the dispatch thread is interrupted.
304      * @throws IOReactorException in case if a non-recoverable I/O error.
305      */
306     @Override
307     public void execute(
308             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
309         Args.notNull(eventDispatch, "Event dispatcher");
310         synchronized (this.statusLock) {
311             if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
312                 this.status = IOReactorStatus.SHUT_DOWN;
313                 this.statusLock.notifyAll();
314                 return;
315             }
316             Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
317                     "Illegal state %s", this.status);
318             this.status = IOReactorStatus.ACTIVE;
319             // Start I/O dispatchers
320             for (int i = 0; i < this.dispatchers.length; i++) {
321                 final BaseIOReactorBaseIOReactor.html#BaseIOReactor">BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
322                 dispatcher.setExceptionHandler(exceptionHandler);
323                 this.dispatchers[i] = dispatcher;
324             }
325             for (int i = 0; i < this.workerCount; i++) {
326                 final BaseIOReactor dispatcher = this.dispatchers[i];
327                 this.workers[i] = new Worker(dispatcher, eventDispatch);
328                 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
329             }
330         }
331         try {
332 
333             for (int i = 0; i < this.workerCount; i++) {
334                 if (this.status != IOReactorStatus.ACTIVE) {
335                     return;
336                 }
337                 this.threads[i].start();
338             }
339 
340             for (;;) {
341                 final int readyCount;
342                 try {
343                     readyCount = this.selector.select(this.selectTimeout);
344                 } catch (final InterruptedIOException ex) {
345                     throw ex;
346                 } catch (final IOException ex) {
347                     throw new IOReactorException("Unexpected selector failure", ex);
348                 }
349 
350                 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
351                     processEvents(readyCount);
352                 }
353 
354                 // Verify I/O dispatchers
355                 for (int i = 0; i < this.workerCount; i++) {
356                     final Worker worker = this.workers[i];
357                     final Throwable ex = worker.getThrowable();
358                     if (ex != null) {
359                         throw new IOReactorException(
360                                 "I/O dispatch worker terminated abnormally", ex);
361                     }
362                 }
363 
364                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
365                     break;
366                 }
367             }
368 
369         } catch (final ClosedSelectorException ex) {
370             addExceptionEvent(ex);
371         } catch (final IOReactorException ex) {
372             if (ex.getCause() != null) {
373                 addExceptionEvent(ex.getCause());
374             }
375             throw ex;
376         } finally {
377             doShutdown();
378             synchronized (this.statusLock) {
379                 this.status = IOReactorStatus.SHUT_DOWN;
380                 this.statusLock.notifyAll();
381             }
382         }
383     }
384 
385     /**
386      * Activates the shutdown sequence for this reactor. This method will cancel
387      * all pending session requests, close out all active I/O channels,
388      * make an attempt to terminate all worker I/O reactors gracefully,
389      * and finally force-terminate those I/O reactors that failed to
390      * terminate after the specified grace period.
391      *
392      * @throws InterruptedIOException if the shutdown sequence has been
393      *   interrupted.
394      */
395     protected void doShutdown() throws InterruptedIOException {
396         synchronized (this.statusLock) {
397             if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
398                 return;
399             }
400             this.status = IOReactorStatus.SHUTTING_DOWN;
401         }
402         try {
403             cancelRequests();
404         } catch (final IOReactorException ex) {
405             if (ex.getCause() != null) {
406                 addExceptionEvent(ex.getCause());
407             }
408         }
409         this.selector.wakeup();
410 
411         // Close out all channels
412         if (this.selector.isOpen()) {
413             for (final SelectionKey key : this.selector.keys()) {
414                 try {
415                     final Channel channel = key.channel();
416                     if (channel != null) {
417                         channel.close();
418                     }
419                 } catch (final IOException ex) {
420                     addExceptionEvent(ex);
421                 }
422             }
423             // Stop dispatching I/O events
424             try {
425                 this.selector.close();
426             } catch (final IOException ex) {
427                 addExceptionEvent(ex);
428             }
429         }
430 
431         // Attempt to shut down I/O dispatchers gracefully
432         for (int i = 0; i < this.workerCount; i++) {
433             final BaseIOReactor dispatcher = this.dispatchers[i];
434             dispatcher.gracefulShutdown();
435         }
436 
437         final long gracePeriod = this.config.getShutdownGracePeriod();
438 
439         try {
440             // Force shut down I/O dispatchers if they fail to terminate
441             // in time
442             for (int i = 0; i < this.workerCount; i++) {
443                 final BaseIOReactor dispatcher = this.dispatchers[i];
444                 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
445                     dispatcher.awaitShutdown(gracePeriod);
446                 }
447                 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
448                     try {
449                         dispatcher.hardShutdown();
450                     } catch (final IOReactorException ex) {
451                         if (ex.getCause() != null) {
452                             addExceptionEvent(ex.getCause());
453                         }
454                     }
455                 }
456             }
457             // Join worker threads
458             for (int i = 0; i < this.workerCount; i++) {
459                 final Thread t = this.threads[i];
460                 if (t != null) {
461                     t.join(gracePeriod);
462                 }
463             }
464         } catch (final InterruptedException ex) {
465             throw new InterruptedIOException(ex.getMessage());
466         }
467     }
468 
469     /**
470      * Assigns the given channel entry to one of the worker I/O reactors.
471      *
472      * @param entry the channel entry.
473      */
474     protected void addChannel(final ChannelEntry entry) {
475         // Distribute new channels among the workers
476         final int i = Math.abs(this.currentWorker++ % this.workerCount);
477         this.dispatchers[i].addChannel(entry);
478     }
479 
480     /**
481      * Registers the given channel with the main {@link Selector}.
482      *
483      * @param channel the channel.
484      * @param ops interest ops.
485      * @return  selection key.
486      * @throws ClosedChannelException if the channel has been already closed.
487      */
488     protected SelectionKey registerChannel(
489             final SelectableChannel channel, final int ops) throws ClosedChannelException {
490         return channel.register(this.selector, ops);
491     }
492 
493     /**
494      * Prepares the given {@link Socket} by resetting some of its properties.
495      *
496      * @param socket the socket
497      * @throws IOException in case of an I/O error.
498      */
499     protected void prepareSocket(final Socket socket) throws IOException {
500         socket.setTcpNoDelay(this.config.isTcpNoDelay());
501         socket.setKeepAlive(this.config.isSoKeepalive());
502         if (this.config.getSoTimeout() > 0) {
503             socket.setSoTimeout(this.config.getSoTimeout());
504         }
505         if (this.config.getSndBufSize() > 0) {
506             socket.setSendBufferSize(this.config.getSndBufSize());
507         }
508         if (this.config.getRcvBufSize() > 0) {
509             socket.setReceiveBufferSize(this.config.getRcvBufSize());
510         }
511         final int linger = this.config.getSoLinger();
512         if (linger >= 0) {
513             socket.setSoLinger(true, linger);
514         }
515     }
516 
517     /**
518      * Blocks for the given period of time in milliseconds awaiting
519      * the completion of the reactor shutdown. If the value of
520      * {@code timeout} is set to {@code 0} this method blocks
521      * indefinitely.
522      *
523      * @param timeout the maximum wait time.
524      * @throws InterruptedException if interrupted.
525      */
526     protected void awaitShutdown(final long timeout) throws InterruptedException {
527         synchronized (this.statusLock) {
528             final long deadline = System.currentTimeMillis() + timeout;
529             long remaining = timeout;
530             while (this.status != IOReactorStatus.SHUT_DOWN) {
531                 this.statusLock.wait(remaining);
532                 if (timeout > 0) {
533                     remaining = deadline - System.currentTimeMillis();
534                     if (remaining <= 0) {
535                         break;
536                     }
537                 }
538             }
539         }
540     }
541 
542     @Override
543     public void shutdown() throws IOException {
544         shutdown(2000);
545     }
546 
547     @Override
548     public void shutdown(final long waitMs) throws IOException {
549         synchronized (this.statusLock) {
550             if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
551                 return;
552             }
553             if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
554                 this.status = IOReactorStatus.SHUT_DOWN;
555                 cancelRequests();
556                 this.selector.close();
557                 return;
558             }
559             this.status = IOReactorStatus.SHUTDOWN_REQUEST;
560         }
561         this.selector.wakeup();
562         try {
563             awaitShutdown(waitMs);
564         } catch (final InterruptedException ignore) {
565         }
566     }
567 
568     static void closeChannel(final Channel channel) {
569         try {
570             channel.close();
571         } catch (final IOException ignore) {
572         }
573     }
574 
575     static class Worker implements Runnable {
576 
577         final BaseIOReactor dispatcher;
578         final IOEventDispatch eventDispatch;
579 
580         private volatile Throwable exception;
581 
582         public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
583             super();
584             this.dispatcher = dispatcher;
585             this.eventDispatch = eventDispatch;
586         }
587 
588         @Override
589         public void run() {
590             try {
591                 this.dispatcher.execute(this.eventDispatch);
592             } catch (final Error ex) {
593                 this.exception = ex;
594                 throw ex;
595             } catch (final Exception ex) {
596                 this.exception = ex;
597             }
598         }
599 
600         public Throwable getThrowable() {
601             return this.exception;
602         }
603 
604     }
605 
606     static class DefaultThreadFactory implements ThreadFactory {
607 
608         private final static AtomicLong COUNT = new AtomicLong(1);
609 
610         @Override
611         public Thread newThread(final Runnable r) {
612             return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
613         }
614 
615     }
616 
617 }