View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.security.AccessController;
39  import java.security.PrivilegedActionException;
40  import java.security.PrivilegedExceptionAction;
41  import java.util.Queue;
42  import java.util.Set;
43  import java.util.concurrent.ConcurrentLinkedQueue;
44  import java.util.concurrent.ThreadFactory;
45  
46  import org.apache.http.nio.reactor.ConnectingIOReactor;
47  import org.apache.http.nio.reactor.IOReactorException;
48  import org.apache.http.nio.reactor.IOReactorStatus;
49  import org.apache.http.nio.reactor.SessionRequest;
50  import org.apache.http.nio.reactor.SessionRequestCallback;
51  import org.apache.http.params.HttpParams;
52  import org.apache.http.util.Asserts;
53  
54  /**
55   * Default implementation of {@link ConnectingIOReactor}. This class extends
56   * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
57   * hosts.
58   *
59   * @since 4.0
60   */
61  @SuppressWarnings("deprecation")
62  public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
63          implements ConnectingIOReactor {
64  
65      private final Queue<SessionRequestImpl> requestQueue;
66  
67      private long lastTimeoutCheck;
68  
69      /**
70       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
71       *
72       * @param config I/O reactor configuration.
73       * @param threadFactory the factory to create threads.
74       *   Can be {@code null}.
75       * @throws IOReactorException in case if a non-recoverable I/O error.
76       *
77       * @since 4.2
78       */
79      public DefaultConnectingIOReactor(
80              final IOReactorConfig config,
81              final ThreadFactory threadFactory) throws IOReactorException {
82          super(config, threadFactory);
83          this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
84          this.lastTimeoutCheck = System.currentTimeMillis();
85      }
86  
87      /**
88       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
89       *
90       * @param config I/O reactor configuration.
91       *   Can be {@code null}.
92       * @throws IOReactorException in case if a non-recoverable I/O error.
93       *
94       * @since 4.2
95       */
96      public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
97          this(config, null);
98      }
99  
100     /**
101      * Creates an instance of DefaultConnectingIOReactor with default configuration.
102      *
103      * @throws IOReactorException in case if a non-recoverable I/O error.
104      *
105      * @since 4.2
106      */
107     public DefaultConnectingIOReactor() throws IOReactorException {
108         this(null, null);
109     }
110 
111     /**
112      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)}
113      */
114     @Deprecated
115     public DefaultConnectingIOReactor(
116             final int workerCount,
117             final ThreadFactory threadFactory,
118             final HttpParams params) throws IOReactorException {
119         this(convert(workerCount, params), threadFactory);
120     }
121 
122     /**
123      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)}
124      */
125     @Deprecated
126     public DefaultConnectingIOReactor(
127             final int workerCount,
128             final HttpParams params) throws IOReactorException {
129         this(convert(workerCount, params), null);
130     }
131 
132     @Override
133     protected void cancelRequests() throws IOReactorException {
134         SessionRequestImpl request;
135         while ((request = this.requestQueue.poll()) != null) {
136             request.cancel();
137         }
138     }
139 
140     @Override
141     protected void processEvents(final int readyCount) throws IOReactorException {
142         processSessionRequests();
143 
144         if (readyCount > 0) {
145             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
146             for (final SelectionKey key : selectedKeys) {
147 
148                 processEvent(key);
149 
150             }
151             selectedKeys.clear();
152         }
153 
154         final long currentTime = System.currentTimeMillis();
155         if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
156             this.lastTimeoutCheck = currentTime;
157             final Set<SelectionKey> keys = this.selector.keys();
158             processTimeouts(keys);
159         }
160     }
161 
162     private void processEvent(final SelectionKey key) {
163         try {
164 
165             if (key.isConnectable()) {
166 
167                 final SocketChannel channel = (SocketChannel) key.channel();
168                 // Get request handle
169                 final SessionRequestHandle/apache/http/impl/nio/reactor/SessionRequestHandle.html#SessionRequestHandle">SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
170                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
171 
172                 // Finish connection process
173                 try {
174                     channel.finishConnect();
175                 } catch (final IOException ex) {
176                     sessionRequest.failed(ex);
177                 }
178                 key.cancel();
179                 key.attach(null);
180                 if (!sessionRequest.isCompleted()) {
181                     addChannel(new ChannelEntry(channel, sessionRequest));
182                 } else {
183                     try {
184                         channel.close();
185                     } catch (final IOException ignore) {
186                     }
187                 }
188             }
189 
190         } catch (final CancelledKeyException ex) {
191             final SessionRequestHandle/apache/http/impl/nio/reactor/SessionRequestHandle.html#SessionRequestHandle">SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
192             key.attach(null);
193             if (requestHandle != null) {
194                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
195                 if (sessionRequest != null) {
196                     sessionRequest.cancel();
197                 }
198             }
199         }
200     }
201 
202     private void processTimeouts(final Set<SelectionKey> keys) {
203         final long now = System.currentTimeMillis();
204         for (final SelectionKey key : keys) {
205             final Object attachment = key.attachment();
206 
207             if (attachment instanceof SessionRequestHandle) {
208                 final SessionRequestHandle/../org/apache/http/impl/nio/reactor/SessionRequestHandle.html#SessionRequestHandle">SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
209                 final SessionRequestImpl sessionRequest = handle.getSessionRequest();
210                 final int timeout = sessionRequest.getConnectTimeout();
211                 if (timeout > 0) {
212                     if (handle.getRequestTime() + timeout < now) {
213                         sessionRequest.timeout();
214                     }
215                 }
216             }
217 
218         }
219     }
220 
221     @Override
222     public SessionRequest connect(
223             final SocketAddress remoteAddress,
224             final SocketAddress localAddress,
225             final Object attachment,
226             final SessionRequestCallback callback) {
227         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
228             "I/O reactor has been shut down");
229         final SessionRequestImplquestImpl.html#SessionRequestImpl">SessionRequestImpl sessionRequest = new SessionRequestImpl(
230                 remoteAddress, localAddress, attachment, callback);
231         sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
232 
233         this.requestQueue.add(sessionRequest);
234         this.selector.wakeup();
235 
236         return sessionRequest;
237     }
238 
239     private void validateAddress(final SocketAddress address) throws UnknownHostException {
240         if (address == null) {
241             return;
242         }
243         if (address instanceof InetSocketAddress) {
244             final InetSocketAddress endpoint = (InetSocketAddress) address;
245             if (endpoint.isUnresolved()) {
246                 throw new UnknownHostException(endpoint.getHostName());
247             }
248         }
249     }
250 
251     private void processSessionRequests() throws IOReactorException {
252         SessionRequestImpl request;
253         while ((request = this.requestQueue.poll()) != null) {
254             if (request.isCompleted()) {
255                 continue;
256             }
257             final SocketChannel socketChannel;
258             try {
259                 socketChannel = SocketChannel.open();
260             } catch (final IOException ex) {
261                 request.failed(ex);
262                 return;
263             }
264             try {
265                 validateAddress(request.getLocalAddress());
266                 validateAddress(request.getRemoteAddress());
267 
268                 socketChannel.configureBlocking(false);
269                 prepareSocket(socketChannel.socket());
270 
271                 if (request.getLocalAddress() != null) {
272                     final Socket sock = socketChannel.socket();
273                     sock.setReuseAddress(this.config.isSoReuseAddress());
274                     sock.bind(request.getLocalAddress());
275                 }
276 
277                 final SocketAddress targetAddress = request.getRemoteAddress();
278                 // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect
279                 // permissions only to this library
280                 final boolean connected;
281                 try {
282                     connected = AccessController.doPrivileged(
283                             new PrivilegedExceptionAction<Boolean>() {
284                                 @Override
285                                 public Boolean run() throws IOException {
286                                     return socketChannel.connect(targetAddress);
287                                 }
288                             });
289                 } catch (final PrivilegedActionException e) {
290                     Asserts.check(e.getCause() instanceof  IOException,
291                             "method contract violation only checked exceptions are wrapped: " + e.getCause());
292                     // only checked exceptions are wrapped - error and RTExceptions are rethrown by doPrivileged
293                     throw (IOException) e.getCause();
294                 }
295                 if (connected) {
296                     final ChannelEntryactor/ChannelEntry.html#ChannelEntry">ChannelEntry entry = new ChannelEntry(socketChannel, request);
297                     addChannel(entry);
298                     continue;
299                 }
300             } catch (final IOException ex) {
301                 closeChannel(socketChannel);
302                 request.failed(ex);
303                 return;
304             } catch (final SecurityException ex) {
305                 closeChannel(socketChannel);
306                 request.failed(new IOException(ex));
307                 return;
308             }
309 
310             final SessionRequestHandleuestHandle.html#SessionRequestHandle">SessionRequestHandle requestHandle = new SessionRequestHandle(request);
311             try {
312                 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
313                         requestHandle);
314                 request.setKey(key);
315             } catch (final IOException ex) {
316                 closeChannel(socketChannel);
317                 throw new IOReactorException("Failure registering channel " +
318                         "with the selector", ex);
319             }
320         }
321     }
322 
323 }