View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.InterruptedIOException;
31  import java.nio.channels.CancelledKeyException;
32  import java.nio.channels.SelectionKey;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.Set;
36  
37  import org.apache.http.nio.reactor.EventMask;
38  import org.apache.http.nio.reactor.IOEventDispatch;
39  import org.apache.http.nio.reactor.IOReactorException;
40  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
41  import org.apache.http.nio.reactor.IOSession;
42  import org.apache.http.util.Args;
43  
44  /**
45   * Default implementation of {@link AbstractIOReactor} that serves as a base
46   * for more advanced {@link org.apache.http.nio.reactor.IOReactor}
47   * implementations. This class adds support for the I/O event dispatching
48   * using {@link IOEventDispatch}, management of buffering sessions, and
49   * session timeout handling.
50   *
51   * @since 4.0
52   */
53  public class BaseIOReactor extends AbstractIOReactor {
54  
55      private final long timeoutCheckInterval;
56      private final Set<IOSession> bufferingSessions;
57  
58      private long lastTimeoutCheck;
59  
60      private IOReactorExceptionHandler exceptionHandler = null;
61      private IOEventDispatch eventDispatch = null;
62  
63      /**
64       * Creates new BaseIOReactor instance.
65       *
66       * @param selectTimeout the select timeout.
67       * @throws IOReactorException in case if a non-recoverable I/O error.
68       */
69      public BaseIOReactor(final long selectTimeout) throws IOReactorException {
70          this(selectTimeout, false);
71      }
72  
73      /**
74       * Creates new BaseIOReactor instance.
75       *
76       * @param selectTimeout the select timeout.
77       * @param interestOpsQueueing Ops queueing flag.
78       *
79       * @throws IOReactorException in case if a non-recoverable I/O error.
80       *
81       * @since 4.1
82       */
83      public BaseIOReactor(
84              final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
85          super(selectTimeout, interestOpsQueueing);
86          this.bufferingSessions = new HashSet<IOSession>();
87          this.timeoutCheckInterval = selectTimeout;
88          this.lastTimeoutCheck = System.currentTimeMillis();
89      }
90  
91      /**
92       * Activates the I/O reactor. The I/O reactor will start reacting to I/O
93       * events and dispatch I/O event notifications to the given
94       * {@link IOEventDispatch}.
95       *
96       * @throws InterruptedIOException if the dispatch thread is interrupted.
97       * @throws IOReactorException in case if a non-recoverable I/O error.
98       */
99      @Override
100     public void execute(
101             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
102         Args.notNull(eventDispatch, "Event dispatcher");
103         this.eventDispatch = eventDispatch;
104         execute();
105     }
106 
107     /**
108      * Sets exception handler for this I/O reactor.
109      *
110      * @param exceptionHandler the exception handler.
111      */
112     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
113         this.exceptionHandler = exceptionHandler;
114     }
115 
116     /**
117      * Handles the given {@link RuntimeException}. This method delegates
118      * handling of the exception to the {@link IOReactorExceptionHandler},
119      * if available.
120      *
121      * @param ex the runtime exception.
122      */
123     protected void handleRuntimeException(final RuntimeException ex) {
124         if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
125             throw ex;
126         }
127     }
128 
129     /**
130      * This I/O reactor implementation does not react to the
131      * {@link SelectionKey#OP_ACCEPT} event.
132      * <p>
133      * Super-classes can override this method to react to the event.
134      */
135     @Override
136     protected void acceptable(final SelectionKey key) {
137     }
138 
139     /**
140      * This I/O reactor implementation does not react to the
141      * {@link SelectionKey#OP_CONNECT} event.
142      * <p>
143      * Super-classes can override this method to react to the event.
144      */
145     @Override
146     protected void connectable(final SelectionKey key) {
147     }
148 
149     /**
150      * Processes {@link SelectionKey#OP_READ} event on the given selection key.
151      * This method dispatches the event notification to the
152      * {@link IOEventDispatch#inputReady(IOSession)} method.
153      */
154     @Override
155     protected void readable(final SelectionKey key) {
156         final IOSession session = getSession(key);
157         try {
158             // Try to gently feed more data to the event dispatcher
159             // if the session input buffer has not been fully exhausted
160             // (the choice of 5 iterations is purely arbitrary)
161             for (int i = 0; i < 5; i++) {
162                 this.eventDispatch.inputReady(session);
163                 if (!session.hasBufferedInput()
164                         || (session.getEventMask() & SelectionKey.OP_READ) == 0) {
165                     break;
166                 }
167             }
168             if (session.hasBufferedInput()) {
169                 this.bufferingSessions.add(session);
170             }
171         } catch (final CancelledKeyException ex) {
172             throw ex;
173         } catch (final RuntimeException ex) {
174             handleRuntimeException(ex);
175         }
176     }
177 
178     /**
179      * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
180      * This method dispatches the event notification to the
181      * {@link IOEventDispatch#outputReady(IOSession)} method.
182      */
183     @Override
184     protected void writable(final SelectionKey key) {
185         final IOSession session = getSession(key);
186         try {
187             this.eventDispatch.outputReady(session);
188         } catch (final CancelledKeyException ex) {
189             throw ex;
190         } catch (final RuntimeException ex) {
191             handleRuntimeException(ex);
192         }
193     }
194 
195     /**
196      * Verifies whether any of the sessions associated with the given selection
197      * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
198      * method.
199      * <p>
200      * This method will also invoke the
201      * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
202      * that have buffered input data.
203      */
204     @Override
205     protected void validate(final Set<SelectionKey> keys) {
206         final long currentTime = System.currentTimeMillis();
207         if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
208             this.lastTimeoutCheck = currentTime;
209             if (keys != null) {
210                 for (final SelectionKey key : keys) {
211                     timeoutCheck(key, currentTime);
212                 }
213             }
214         }
215         if (!this.bufferingSessions.isEmpty()) {
216             for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
217                 final IOSession session = it.next();
218                 if (!session.hasBufferedInput()) {
219                     it.remove();
220                     continue;
221                 }
222                 try {
223                     if ((session.getEventMask() & EventMask.READ) > 0) {
224                         this.eventDispatch.inputReady(session);
225                         if (!session.hasBufferedInput()) {
226                             it.remove();
227                         }
228                     }
229                 } catch (final CancelledKeyException ex) {
230                     it.remove();
231                     session.close();
232                 } catch (final RuntimeException ex) {
233                     handleRuntimeException(ex);
234                 }
235             }
236         }
237     }
238 
239     /**
240      * Processes newly created I/O session. This method dispatches the event
241      * notification to the {@link IOEventDispatch#connected(IOSession)} method.
242      */
243     @Override
244     protected void sessionCreated(final SelectionKey key, final IOSession session) {
245         try {
246             this.eventDispatch.connected(session);
247         } catch (final CancelledKeyException ex) {
248             throw ex;
249         } catch (final RuntimeException ex) {
250             handleRuntimeException(ex);
251         }
252     }
253 
254     /**
255      * Processes timed out I/O session. This method dispatches the event
256      * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
257      */
258     @Override
259     protected void sessionTimedOut(final IOSession session) {
260         try {
261             this.eventDispatch.timeout(session);
262         } catch (final CancelledKeyException ex) {
263             throw ex;
264         } catch (final RuntimeException ex) {
265             handleRuntimeException(ex);
266         }
267     }
268 
269     /**
270      * Processes closed I/O session. This method dispatches the event
271      * notification to the {@link IOEventDispatch#disconnected(IOSession)}
272      * method.
273      */
274     @Override
275     protected void sessionClosed(final IOSession session) {
276         try {
277             this.eventDispatch.disconnected(session);
278         } catch (final CancelledKeyException ex) {
279             // ignore
280         } catch (final RuntimeException ex) {
281             handleRuntimeException(ex);
282         }
283     }
284 
285 }