View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ClosedSelectorException;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  
44  import org.apache.http.nio.reactor.IOReactor;
45  import org.apache.http.nio.reactor.IOReactorException;
46  import org.apache.http.nio.reactor.IOReactorStatus;
47  import org.apache.http.nio.reactor.IOSession;
48  import org.apache.http.util.Args;
49  import org.apache.http.util.Asserts;
50  
51  /**
52   * Generic implementation of {@link IOReactor} that can used as a subclass
53   * for more specialized I/O reactors. It is based on a single {@link Selector}
54   * instance.
55   *
56   * @since 4.0
57   */
58  public abstract class AbstractIOReactor implements IOReactor {
59  
60      private volatile IOReactorStatus status;
61  
62      private final Object statusMutex;
63      private final long selectTimeout;
64      private final boolean interestOpsQueueing;
65      private final Selector selector;
66      private final Set<IOSession> sessions;
67      private final Queue<InterestOpEntry> interestOpsQueue;
68      private final Queue<IOSession> closedSessions;
69      private final Queue<ChannelEntry> newChannels;
70  
71      /**
72       * Creates new AbstractIOReactor instance.
73       *
74       * @param selectTimeout the select timeout.
75       * @throws IOReactorException in case if a non-recoverable I/O error.
76       */
77      public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
78          this(selectTimeout, false);
79      }
80  
81      /**
82       * Creates new AbstractIOReactor instance.
83       *
84       * @param selectTimeout the select timeout.
85       * @param interestOpsQueueing Ops queueing flag.
86       *
87       * @throws IOReactorException in case if a non-recoverable I/O error.
88       *
89       * @since 4.1
90       */
91      public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
92          super();
93          Args.positive(selectTimeout, "Select timeout");
94          this.selectTimeout = selectTimeout;
95          this.interestOpsQueueing = interestOpsQueueing;
96          this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
97          this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
98          this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
99          this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
100         try {
101             this.selector = Selector.open();
102         } catch (final IOException ex) {
103             throw new IOReactorException("Failure opening selector", ex);
104         }
105         this.statusMutex = new Object();
106         this.status = IOReactorStatus.INACTIVE;
107     }
108 
109     /**
110      * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
111      * <p>
112      * Super-classes can implement this method to react to the event.
113      *
114      * @param key the selection key.
115      */
116     protected abstract void acceptable(SelectionKey key);
117 
118     /**
119      * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
120      * <p>
121      * Super-classes can implement this method to react to the event.
122      *
123      * @param key the selection key.
124      */
125     protected abstract void connectable(SelectionKey key);
126 
127     /**
128      * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
129      * <p>
130      * Super-classes can implement this method to react to the event.
131      *
132      * @param key the selection key.
133      */
134     protected abstract void readable(SelectionKey key);
135 
136     /**
137      * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
138      * <p>
139      * Super-classes can implement this method to react to the event.
140      *
141      * @param key the selection key.
142      */
143     protected abstract void writable(SelectionKey key);
144 
145     /**
146      * Triggered to validate keys currently registered with the selector. This
147      * method is called after each I/O select loop.
148      * <p>
149      * Super-classes can implement this method to run validity checks on
150      * active sessions and include additional processing that needs to be
151      * executed after each I/O select loop.
152      *
153      * @param keys all selection keys registered with the selector.
154      */
155     protected abstract void validate(Set<SelectionKey> keys);
156 
157     /**
158      * Triggered when new session has been created.
159      * <p>
160      * Super-classes can implement this method to react to the event.
161      *
162      * @param key the selection key.
163      * @param session new I/O session.
164      */
165     protected void sessionCreated(final SelectionKey key, final IOSession session) {
166     }
167 
168     /**
169      * Triggered when a session has been closed.
170      * <p>
171      * Super-classes can implement this method to react to the event.
172      *
173      * @param session closed I/O session.
174      */
175     protected void sessionClosed(final IOSession session) {
176     }
177 
178     /**
179      * Triggered when a session has timed out.
180      * <p>
181      * Super-classes can implement this method to react to the event.
182      *
183      * @param session timed out I/O session.
184      */
185     protected void sessionTimedOut(final IOSession session) {
186     }
187 
188     /**
189      * Obtains {@link IOSession} instance associated with the given selection
190      * key.
191      *
192      * @param key the selection key.
193      * @return I/O session.
194      */
195     protected IOSession getSession(final SelectionKey key) {
196         return (IOSession) key.attachment();
197     }
198 
199     @Override
200     public IOReactorStatus getStatus() {
201         return this.status;
202     }
203 
204     /**
205      * Returns {@code true} if interest Ops queueing is enabled, {@code false} otherwise.
206      *
207      * @since 4.1
208      */
209     public boolean getInterestOpsQueueing() {
210         return this.interestOpsQueueing;
211     }
212 
213     /**
214      * Adds new channel entry. The channel will be asynchronously registered
215      * with the selector.
216      *
217      * @param channelEntry the channel entry.
218      */
219     public void addChannel(final ChannelEntry channelEntry) {
220         Args.notNull(channelEntry, "Channel entry");
221         this.newChannels.add(channelEntry);
222         this.selector.wakeup();
223     }
224 
225     /**
226      * Activates the I/O reactor. The I/O reactor will start reacting to
227      * I/O events and triggering notification methods.
228      * <p>
229      * This method will enter the infinite I/O select loop on
230      * the {@link Selector} instance associated with this I/O reactor.
231      * <p>
232      * The method will remain blocked unto the I/O reactor is shut down or the
233      * execution thread is interrupted.
234      *
235      * @see #acceptable(SelectionKey)
236      * @see #connectable(SelectionKey)
237      * @see #readable(SelectionKey)
238      * @see #writable(SelectionKey)
239      * @see #timeoutCheck(SelectionKey, long)
240      * @see #validate(Set)
241      * @see #sessionCreated(SelectionKey, IOSession)
242      * @see #sessionClosed(IOSession)
243      *
244      * @throws InterruptedIOException if the dispatch thread is interrupted.
245      * @throws IOReactorException in case if a non-recoverable I/O error.
246      */
247     protected void execute() throws InterruptedIOException, IOReactorException {
248         this.status = IOReactorStatus.ACTIVE;
249 
250         try {
251             for (;;) {
252 
253                 final int readyCount;
254                 try {
255                     readyCount = this.selector.select(this.selectTimeout);
256                 } catch (final InterruptedIOException ex) {
257                     throw ex;
258                 } catch (final IOException ex) {
259                     throw new IOReactorException("Unexpected selector failure", ex);
260                 }
261 
262                 if (this.status == IOReactorStatus.SHUT_DOWN) {
263                     // Hard shut down. Exit select loop immediately
264                     break;
265                 }
266 
267                 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
268                     // Graceful shutdown in process
269                     // Try to close things out nicely
270                     closeSessions();
271                     closeNewChannels();
272                 }
273 
274                 // Process selected I/O events
275                 if (readyCount > 0) {
276                     processEvents(this.selector.selectedKeys());
277                 }
278 
279                 // Validate active channels
280                 validate(this.selector.keys());
281 
282                 // Process closed sessions
283                 processClosedSessions();
284 
285                 // If active process new channels
286                 if (this.status == IOReactorStatus.ACTIVE) {
287                     processNewChannels();
288                 }
289 
290                 // Exit select loop if graceful shutdown has been completed
291                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
292                         && this.sessions.isEmpty()) {
293                     break;
294                 }
295 
296                 if (this.interestOpsQueueing) {
297                     // process all pending interestOps() operations
298                     processPendingInterestOps();
299                 }
300 
301             }
302 
303         } catch (final ClosedSelectorException ignore) {
304         } finally {
305             hardShutdown();
306             synchronized (this.statusMutex) {
307                 this.statusMutex.notifyAll();
308             }
309         }
310     }
311 
312     private void processEvents(final Set<SelectionKey> selectedKeys) {
313         for (final SelectionKey key : selectedKeys) {
314 
315             processEvent(key);
316 
317         }
318         selectedKeys.clear();
319     }
320 
321     /**
322      * Processes new event on the given selection key.
323      *
324      * @param key the selection key that triggered an event.
325      */
326     protected void processEvent(final SelectionKey key) {
327         final IOSessionImpl/../../../org/apache/http/impl/nio/reactor/IOSessionImpl.html#IOSessionImpl">IOSessionImpl session = (IOSessionImpl) key.attachment();
328         try {
329             if (key.isAcceptable()) {
330                 acceptable(key);
331             }
332             if (key.isConnectable()) {
333                 connectable(key);
334             }
335             if (key.isReadable()) {
336                 session.resetLastRead();
337                 readable(key);
338             }
339             if (key.isWritable()) {
340                 session.resetLastWrite();
341                 writable(key);
342             }
343         } catch (final CancelledKeyException ex) {
344             queueClosedSession(session);
345             key.attach(null);
346         }
347     }
348 
349     /**
350      * Queues the given I/O session to be processed asynchronously as closed.
351      *
352      * @param session the closed I/O session.
353      */
354     protected void queueClosedSession(final IOSession session) {
355         if (session != null) {
356             this.closedSessions.add(session);
357         }
358     }
359 
360     private void processNewChannels() throws IOReactorException {
361         ChannelEntry entry;
362         while ((entry = this.newChannels.poll()) != null) {
363 
364             final SocketChannel channel;
365             final SelectionKey key;
366             try {
367                 channel = entry.getChannel();
368                 channel.configureBlocking(false);
369                 key = channel.register(this.selector, SelectionKey.OP_READ);
370             } catch (final ClosedChannelException ex) {
371                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
372                 if (sessionRequest != null) {
373                     sessionRequest.failed(ex);
374                 }
375                 return;
376 
377             } catch (final IOException ex) {
378                 throw new IOReactorException("Failure registering channel " +
379                         "with the selector", ex);
380             }
381 
382             final SessionClosedCallbackck.html#SessionClosedCallback">SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
383 
384                 @Override
385                 public void sessionClosed(final IOSession session) {
386                     queueClosedSession(session);
387                 }
388 
389             };
390 
391             InterestOpsCallback interestOpsCallback = null;
392             if (this.interestOpsQueueing) {
393                 interestOpsCallback = new InterestOpsCallback() {
394 
395                     @Override
396                     public void addInterestOps(final InterestOpEntry entry) {
397                         queueInterestOps(entry);
398                     }
399 
400                 };
401             }
402 
403             final IOSession session;
404             try {
405                 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
406                 int timeout = 0;
407                 try {
408                     timeout = channel.socket().getSoTimeout();
409                 } catch (final IOException ex) {
410                     // Very unlikely to happen and is not fatal
411                     // as the protocol layer is expected to overwrite
412                     // this value anyways
413                 }
414 
415                 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
416                 session.setSocketTimeout(timeout);
417             } catch (final CancelledKeyException ex) {
418                 continue;
419             }
420             try {
421                 this.sessions.add(session);
422                 key.attach(session);
423                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
424                 if (sessionRequest != null) {
425                     if (!sessionRequest.isTerminated()) {
426                         sessionRequest.completed(session);
427                     }
428                     if (!sessionRequest.isTerminated() && !session.isClosed()) {
429                         sessionCreated(key, session);
430                     }
431                     if (sessionRequest.isTerminated()) {
432                         throw new CancelledKeyException();
433                     }
434                 } else {
435                     sessionCreated(key, session);
436                 }
437             } catch (final CancelledKeyException ex) {
438                 session.close();
439                 key.attach(null);
440             }
441         }
442     }
443 
444     private void processClosedSessions() {
445         IOSession session;
446         while ((session = this.closedSessions.poll()) != null) {
447             if (this.sessions.remove(session)) {
448                 try {
449                     sessionClosed(session);
450                 } catch (final CancelledKeyException ex) {
451                     // ignore and move on
452                 }
453             }
454         }
455     }
456 
457     private void processPendingInterestOps() {
458         // validity check
459         if (!this.interestOpsQueueing) {
460             return;
461         }
462         InterestOpEntry entry;
463         while ((entry = this.interestOpsQueue.poll()) != null) {
464             // obtain the operation's details
465             final SelectionKey key = entry.getSelectionKey();
466             final int eventMask = entry.getEventMask();
467             if (key.isValid()) {
468                 key.interestOps(eventMask);
469             }
470         }
471     }
472 
473     private boolean queueInterestOps(final InterestOpEntry entry) {
474         // validity checks
475         Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
476         if (entry == null) {
477             return false;
478         }
479 
480         // add this operation to the interestOps() queue
481         this.interestOpsQueue.add(entry);
482 
483         return true;
484     }
485 
486     /**
487      * Triggered to verify whether the I/O session associated with the
488      * given selection key has not timed out.
489      * <p>
490      * Super-classes can implement this method to react to the event.
491      *
492      * @param key the selection key.
493      * @param now current time as long value.
494      */
495     protected void timeoutCheck(final SelectionKey key, final long now) {
496         final IOSessionImpl/../../../org/apache/http/impl/nio/reactor/IOSessionImpl.html#IOSessionImpl">IOSessionImpl session = (IOSessionImpl) key.attachment();
497         if (session != null) {
498             final int timeout = session.getSocketTimeout();
499             if (timeout > 0) {
500                 if (session.getLastAccessTime() + timeout < now) {
501                     try {
502                         sessionTimedOut(session);
503                     } catch (final CancelledKeyException ex) {
504                         session.close();
505                         key.attach(null);
506                     }
507                 }
508             }
509         }
510     }
511 
512     /**
513      * Closes out all I/O sessions maintained by this I/O reactor.
514      */
515     protected void closeSessions() {
516         synchronized (this.sessions) {
517             for (final IOSession session : this.sessions) {
518                 session.close();
519             }
520         }
521     }
522 
523     /**
524      * Closes out all new channels pending registration with the selector of
525      * this I/O reactor.
526      * @throws IOReactorException - not thrown currently
527      */
528     protected void closeNewChannels() throws IOReactorException {
529         ChannelEntry entry;
530         while ((entry = this.newChannels.poll()) != null) {
531             final SessionRequestImpl sessionRequest = entry.getSessionRequest();
532             if (sessionRequest != null) {
533                 sessionRequest.cancel();
534             }
535             final SocketChannel channel = entry.getChannel();
536             try {
537                 channel.close();
538             } catch (final IOException ignore) {
539             }
540         }
541     }
542 
543     /**
544      * Closes out all active channels registered with the selector of
545      * this I/O reactor.
546      * @throws IOReactorException - not thrown currently
547      */
548     protected void closeActiveChannels() throws IOReactorException {
549         try {
550             final Set<SelectionKey> keys = this.selector.keys();
551             for (final SelectionKey key : keys) {
552                 final IOSession session = getSession(key);
553                 if (session != null) {
554                     session.close();
555                 }
556             }
557             this.selector.close();
558         } catch (final IOException ignore) {
559         }
560     }
561 
562     /**
563      * Attempts graceful shutdown of this I/O reactor.
564      */
565     public void gracefulShutdown() {
566         synchronized (this.statusMutex) {
567             if (this.status != IOReactorStatus.ACTIVE) {
568                 // Already shutting down
569                 return;
570             }
571             this.status = IOReactorStatus.SHUTTING_DOWN;
572         }
573         this.selector.wakeup();
574     }
575 
576     /**
577      * Attempts force-shutdown of this I/O reactor.
578      */
579     public void hardShutdown() throws IOReactorException {
580         synchronized (this.statusMutex) {
581             if (this.status == IOReactorStatus.SHUT_DOWN) {
582                 // Already shut down
583                 return;
584             }
585             this.status = IOReactorStatus.SHUT_DOWN;
586         }
587 
588         closeNewChannels();
589         closeActiveChannels();
590         processClosedSessions();
591     }
592 
593     /**
594      * Blocks for the given period of time in milliseconds awaiting
595      * the completion of the reactor shutdown.
596      *
597      * @param timeout the maximum wait time.
598      * @throws InterruptedException if interrupted.
599      */
600     public void awaitShutdown(final long timeout) throws InterruptedException {
601         synchronized (this.statusMutex) {
602             final long deadline = System.currentTimeMillis() + timeout;
603             long remaining = timeout;
604             while (this.status != IOReactorStatus.SHUT_DOWN) {
605                 this.statusMutex.wait(remaining);
606                 if (timeout > 0) {
607                     remaining = deadline - System.currentTimeMillis();
608                     if (remaining <= 0) {
609                         break;
610                     }
611                 }
612             }
613         }
614     }
615 
616     @Override
617     public void shutdown(final long gracePeriod) throws IOReactorException {
618         if (this.status != IOReactorStatus.INACTIVE) {
619             gracefulShutdown();
620             try {
621                 awaitShutdown(gracePeriod);
622             } catch (final InterruptedException ignore) {
623             }
624         }
625         if (this.status != IOReactorStatus.SHUT_DOWN) {
626             hardShutdown();
627         }
628     }
629 
630     @Override
631     public void shutdown() throws IOReactorException {
632         shutdown(1000);
633     }
634 
635 }