View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.security.AccessController;
40  import java.security.PrivilegedActionException;
41  import java.security.PrivilegedExceptionAction;
42  import java.util.Queue;
43  import java.util.Set;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  
48  import org.apache.hc.core5.concurrent.FutureCallback;
49  import org.apache.hc.core5.function.Callback;
50  import org.apache.hc.core5.function.Decorator;
51  import org.apache.hc.core5.io.CloseMode;
52  import org.apache.hc.core5.io.Closer;
53  import org.apache.hc.core5.net.NamedEndpoint;
54  import org.apache.hc.core5.util.Args;
55  import org.apache.hc.core5.util.Asserts;
56  import org.apache.hc.core5.util.Timeout;
57  
58  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
59  
60      private static final int MAX_CHANNEL_REQUESTS = 10000;
61  
62      private final IOEventHandlerFactory eventHandlerFactory;
63      private final IOReactorConfig reactorConfig;
64      private final Decorator<IOSession> ioSessionDecorator;
65      private final IOSessionListener sessionListener;
66      private final Callback<IOSession> sessionShutdownCallback;
67      private final Queue<InternalDataChannel> closedSessions;
68      private final Queue<SocketChannel> channelQueue;
69      private final Queue<IOSessionRequest> requestQueue;
70      private final AtomicBoolean shutdownInitiated;
71      private final long selectTimeoutMillis;
72      private volatile long lastTimeoutCheckMillis;
73  
74      SingleCoreIOReactor(
75              final Callback<Exception> exceptionCallback,
76              final IOEventHandlerFactory eventHandlerFactory,
77              final IOReactorConfig reactorConfig,
78              final Decorator<IOSession> ioSessionDecorator,
79              final IOSessionListener sessionListener,
80              final Callback<IOSession> sessionShutdownCallback) {
81          super(exceptionCallback);
82          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
83          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
84          this.ioSessionDecorator = ioSessionDecorator;
85          this.sessionListener = sessionListener;
86          this.sessionShutdownCallback = sessionShutdownCallback;
87          this.shutdownInitiated = new AtomicBoolean(false);
88          this.closedSessions = new ConcurrentLinkedQueue<>();
89          this.channelQueue = new ConcurrentLinkedQueue<>();
90          this.requestQueue = new ConcurrentLinkedQueue<>();
91          this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
92      }
93  
94      void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
95          Args.notNull(socketChannel, "SocketChannel");
96          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
97              throw new IOReactorShutdownException("I/O reactor has been shut down");
98          }
99          this.channelQueue.add(socketChannel);
100         this.selector.wakeup();
101     }
102 
103     @Override
104     void doTerminate() {
105         closePendingChannels();
106         closePendingConnectionRequests();
107         processClosedSessions();
108     }
109 
110     @Override
111     void doExecute() throws IOException {
112         while (!Thread.currentThread().isInterrupted()) {
113 
114             final int readyCount = this.selector.select(this.selectTimeoutMillis);
115 
116             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
117                 if (this.shutdownInitiated.compareAndSet(false, true)) {
118                     initiateSessionShutdown();
119                 }
120                 closePendingChannels();
121             }
122             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
123                 break;
124             }
125 
126             // Process selected I/O events
127             if (readyCount > 0) {
128                 processEvents(this.selector.selectedKeys());
129             }
130 
131             validateActiveChannels();
132 
133             // Process closed sessions
134             processClosedSessions();
135 
136             // If active process new channels
137             if (getStatus() == IOReactorStatus.ACTIVE) {
138                 processPendingChannels();
139                 processPendingConnectionRequests();
140             }
141 
142             // Exit select loop if graceful shutdown has been completed
143             if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
144                 break;
145             }
146             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
147                 break;
148             }
149         }
150     }
151 
152     private void initiateSessionShutdown() {
153         if (this.sessionShutdownCallback != null) {
154             final Set<SelectionKey> keys = this.selector.keys();
155             for (final SelectionKey key : keys) {
156                 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
157                 if (channel instanceof InternalDataChannel) {
158                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
159                 }
160             }
161         }
162     }
163 
164     private void validateActiveChannels() {
165         final long currentTimeMillis = System.currentTimeMillis();
166         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
167             this.lastTimeoutCheckMillis = currentTimeMillis;
168             for (final SelectionKey key : this.selector.keys()) {
169                 checkTimeout(key, currentTimeMillis);
170             }
171         }
172     }
173 
174     private void processEvents(final Set<SelectionKey> selectedKeys) {
175         for (final SelectionKey key : selectedKeys) {
176             final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
177             if (channel != null) {
178                 try {
179                     channel.handleIOEvent(key.readyOps());
180                 } catch (final CancelledKeyException ex) {
181                     channel.close(CloseMode.GRACEFUL);
182                 }
183             }
184         }
185         selectedKeys.clear();
186     }
187 
188     private void processPendingChannels() throws IOException {
189         SocketChannel socketChannel;
190         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
191             try {
192                 prepareSocket(socketChannel.socket());
193                 socketChannel.configureBlocking(false);
194             } catch (final IOException ex) {
195                 logException(ex);
196                 try {
197                     socketChannel.close();
198                 } catch (final IOException ex2) {
199                     logException(ex2);
200                 }
201                 throw ex;
202             }
203             final SelectionKey key;
204             try {
205                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
206             } catch (final ClosedChannelException ex) {
207                 return;
208             }
209             final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
210             final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
211                     ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession,
212                     null,
213                     sessionListener,
214                     closedSessions);
215             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
216             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
217             key.attach(dataChannel);
218             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
219         }
220     }
221 
222     private void processClosedSessions() {
223         for (;;) {
224             final InternalDataChannel dataChannel = this.closedSessions.poll();
225             if (dataChannel == null) {
226                 break;
227             }
228             try {
229                 dataChannel.disconnected();
230             } catch (final CancelledKeyException ex) {
231                 // ignore and move on
232             }
233         }
234     }
235 
236     private void checkTimeout(final SelectionKey key, final long nowMillis) {
237         final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
238         if (channel != null) {
239             channel.checkTimeout(nowMillis);
240         }
241     }
242 
243     @Override
244     public Future<IOSession> connect(
245             final NamedEndpoint remoteEndpoint,
246             final SocketAddress remoteAddress,
247             final SocketAddress localAddress,
248             final Timeout timeout,
249             final Object attachment,
250             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
251         Args.notNull(remoteEndpoint, "Remote endpoint");
252         final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
253                 remoteEndpoint,
254                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
255                 localAddress,
256                 timeout,
257                 attachment,
258                 callback);
259 
260         this.requestQueue.add(sessionRequest);
261         this.selector.wakeup();
262 
263         return sessionRequest;
264     }
265 
266     private void prepareSocket(final Socket socket) throws IOException {
267         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
268         socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
269         if (this.reactorConfig.getSndBufSize() > 0) {
270             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
271         }
272         if (this.reactorConfig.getRcvBufSize() > 0) {
273             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
274         }
275         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
276         if (linger >= 0) {
277             socket.setSoLinger(true, linger);
278         }
279     }
280 
281     private void validateAddress(final SocketAddress address) throws UnknownHostException {
282         if (address instanceof InetSocketAddress) {
283             final InetSocketAddress endpoint = (InetSocketAddress) address;
284             if (endpoint.isUnresolved()) {
285                 throw new UnknownHostException(endpoint.getHostName());
286             }
287         }
288     }
289 
290     private void processPendingConnectionRequests() {
291         IOSessionRequest sessionRequest;
292         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
293             if (!sessionRequest.isCancelled()) {
294                 final SocketChannel socketChannel;
295                 try {
296                     socketChannel = SocketChannel.open();
297                 } catch (final IOException ex) {
298                     sessionRequest.failed(ex);
299                     return;
300                 }
301                 try {
302                     processConnectionRequest(socketChannel, sessionRequest);
303                 } catch (final IOException | SecurityException ex) {
304                     Closer.closeQuietly(socketChannel);
305                     sessionRequest.failed(ex);
306                 }
307             }
308         }
309     }
310 
311     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
312         validateAddress(sessionRequest.localAddress);
313         validateAddress(sessionRequest.remoteAddress);
314 
315         socketChannel.configureBlocking(false);
316         prepareSocket(socketChannel.socket());
317 
318         if (sessionRequest.localAddress != null) {
319             final Socket sock = socketChannel.socket();
320             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
321             sock.bind(sessionRequest.localAddress);
322         }
323 
324         final SocketAddress targetAddress;
325         final IOEventHandlerFactory eventHandlerFactory;
326         if (this.reactorConfig.getSocksProxyAddress() != null) {
327             targetAddress = this.reactorConfig.getSocksProxyAddress();
328             eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
329                     sessionRequest.remoteAddress,
330                     this.reactorConfig.getSocksProxyUsername(),
331                     this.reactorConfig.getSocksProxyPassword(),
332                     this.eventHandlerFactory);
333         } else {
334             targetAddress = sessionRequest.remoteAddress;
335             eventHandlerFactory = this.eventHandlerFactory;
336         }
337 
338         // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect permissions
339         // only to this library
340         final boolean connected;
341         try {
342             connected = AccessController.doPrivileged(
343                         new PrivilegedExceptionAction<Boolean>() {
344                             @Override
345                             public Boolean run() throws IOException {
346                                 return socketChannel.connect(targetAddress);
347                             }
348                         });
349         } catch (final PrivilegedActionException e) {
350             Asserts.check(e.getCause() instanceof  IOException,
351                     "method contract violation only checked exceptions are wrapped: " + e.getCause());
352             // only checked exceptions are wrapped - error and RTExceptions are rethrown by doPrivileged
353             throw (IOException) e.getCause();
354         }
355 
356 
357         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
358         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
359 
360             @Override
361             public InternalDataChannel create(
362                     final SelectionKey key,
363                     final SocketChannel socketChannel,
364                     final NamedEndpoint namedEndpoint,
365                     final Object attachment) {
366                 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
367                 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
368                         ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession,
369                         namedEndpoint,
370                         sessionListener,
371                         closedSessions);
372                 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
373                 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
374                 return dataChannel;
375             }
376 
377         });
378         if (connected) {
379             channel.handleIOEvent(SelectionKey.OP_CONNECT);
380         } else {
381             key.attach(channel);
382             sessionRequest.assign(channel);
383         }
384     }
385 
386     private void closePendingChannels() {
387         SocketChannel socketChannel;
388         while ((socketChannel = this.channelQueue.poll()) != null) {
389             try {
390                 socketChannel.close();
391             } catch (final IOException ex) {
392                 logException(ex);
393             }
394         }
395     }
396 
397     private void closePendingConnectionRequests() {
398         IOSessionRequest sessionRequest;
399         while ((sessionRequest = this.requestQueue.poll()) != null) {
400             sessionRequest.cancel();
401         }
402     }
403 
404 }