1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.SocketChannel;
39 import java.security.AccessController;
40 import java.security.PrivilegedActionException;
41 import java.security.PrivilegedExceptionAction;
42 import java.util.Queue;
43 import java.util.Set;
44 import java.util.concurrent.ConcurrentLinkedQueue;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.function.Callback;
50 import org.apache.hc.core5.function.Decorator;
51 import org.apache.hc.core5.io.CloseMode;
52 import org.apache.hc.core5.io.Closer;
53 import org.apache.hc.core5.net.NamedEndpoint;
54 import org.apache.hc.core5.util.Args;
55 import org.apache.hc.core5.util.Asserts;
56 import org.apache.hc.core5.util.Timeout;
57
58 class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
59
60 private static final int MAX_CHANNEL_REQUESTS = 10000;
61
62 private final IOEventHandlerFactory eventHandlerFactory;
63 private final IOReactorConfig reactorConfig;
64 private final Decorator<IOSession> ioSessionDecorator;
65 private final IOSessionListener sessionListener;
66 private final Callback<IOSession> sessionShutdownCallback;
67 private final Queue<InternalDataChannel> closedSessions;
68 private final Queue<SocketChannel> channelQueue;
69 private final Queue<IOSessionRequest> requestQueue;
70 private final AtomicBoolean shutdownInitiated;
71 private final long selectTimeoutMillis;
72 private volatile long lastTimeoutCheckMillis;
73
74 SingleCoreIOReactor(
75 final Callback<Exception> exceptionCallback,
76 final IOEventHandlerFactory eventHandlerFactory,
77 final IOReactorConfig reactorConfig,
78 final Decorator<IOSession> ioSessionDecorator,
79 final IOSessionListener sessionListener,
80 final Callback<IOSession> sessionShutdownCallback) {
81 super(exceptionCallback);
82 this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
83 this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
84 this.ioSessionDecorator = ioSessionDecorator;
85 this.sessionListener = sessionListener;
86 this.sessionShutdownCallback = sessionShutdownCallback;
87 this.shutdownInitiated = new AtomicBoolean(false);
88 this.closedSessions = new ConcurrentLinkedQueue<>();
89 this.channelQueue = new ConcurrentLinkedQueue<>();
90 this.requestQueue = new ConcurrentLinkedQueue<>();
91 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
92 }
93
94 void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
95 Args.notNull(socketChannel, "SocketChannel");
96 if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
97 throw new IOReactorShutdownException("I/O reactor has been shut down");
98 }
99 this.channelQueue.add(socketChannel);
100 this.selector.wakeup();
101 }
102
103 @Override
104 void doTerminate() {
105 closePendingChannels();
106 closePendingConnectionRequests();
107 processClosedSessions();
108 }
109
110 @Override
111 void doExecute() throws IOException {
112 while (!Thread.currentThread().isInterrupted()) {
113
114 final int readyCount = this.selector.select(this.selectTimeoutMillis);
115
116 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
117 if (this.shutdownInitiated.compareAndSet(false, true)) {
118 initiateSessionShutdown();
119 }
120 closePendingChannels();
121 }
122 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
123 break;
124 }
125
126
127 if (readyCount > 0) {
128 processEvents(this.selector.selectedKeys());
129 }
130
131 validateActiveChannels();
132
133
134 processClosedSessions();
135
136
137 if (getStatus() == IOReactorStatus.ACTIVE) {
138 processPendingChannels();
139 processPendingConnectionRequests();
140 }
141
142
143 if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
144 break;
145 }
146 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
147 break;
148 }
149 }
150 }
151
152 private void initiateSessionShutdown() {
153 if (this.sessionShutdownCallback != null) {
154 final Set<SelectionKey> keys = this.selector.keys();
155 for (final SelectionKey key : keys) {
156 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
157 if (channel instanceof InternalDataChannel) {
158 this.sessionShutdownCallback.execute((InternalDataChannel) channel);
159 }
160 }
161 }
162 }
163
164 private void validateActiveChannels() {
165 final long currentTimeMillis = System.currentTimeMillis();
166 if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
167 this.lastTimeoutCheckMillis = currentTimeMillis;
168 for (final SelectionKey key : this.selector.keys()) {
169 checkTimeout(key, currentTimeMillis);
170 }
171 }
172 }
173
174 private void processEvents(final Set<SelectionKey> selectedKeys) {
175 for (final SelectionKey key : selectedKeys) {
176 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
177 if (channel != null) {
178 try {
179 channel.handleIOEvent(key.readyOps());
180 } catch (final CancelledKeyException ex) {
181 channel.close(CloseMode.GRACEFUL);
182 }
183 }
184 }
185 selectedKeys.clear();
186 }
187
188 private void processPendingChannels() throws IOException {
189 SocketChannel socketChannel;
190 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
191 try {
192 prepareSocket(socketChannel.socket());
193 socketChannel.configureBlocking(false);
194 } catch (final IOException ex) {
195 logException(ex);
196 try {
197 socketChannel.close();
198 } catch (final IOException ex2) {
199 logException(ex2);
200 }
201 throw ex;
202 }
203 final SelectionKey key;
204 try {
205 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
206 } catch (final ClosedChannelException ex) {
207 return;
208 }
209 final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
210 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
211 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession,
212 null,
213 sessionListener,
214 closedSessions);
215 dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
216 dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
217 key.attach(dataChannel);
218 dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
219 }
220 }
221
222 private void processClosedSessions() {
223 for (;;) {
224 final InternalDataChannel dataChannel = this.closedSessions.poll();
225 if (dataChannel == null) {
226 break;
227 }
228 try {
229 dataChannel.disconnected();
230 } catch (final CancelledKeyException ex) {
231
232 }
233 }
234 }
235
236 private void checkTimeout(final SelectionKey key, final long nowMillis) {
237 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
238 if (channel != null) {
239 channel.checkTimeout(nowMillis);
240 }
241 }
242
243 @Override
244 public Future<IOSession> connect(
245 final NamedEndpoint remoteEndpoint,
246 final SocketAddress remoteAddress,
247 final SocketAddress localAddress,
248 final Timeout timeout,
249 final Object attachment,
250 final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
251 Args.notNull(remoteEndpoint, "Remote endpoint");
252 final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
253 remoteEndpoint,
254 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
255 localAddress,
256 timeout,
257 attachment,
258 callback);
259
260 this.requestQueue.add(sessionRequest);
261 this.selector.wakeup();
262
263 return sessionRequest;
264 }
265
266 private void prepareSocket(final Socket socket) throws IOException {
267 socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
268 socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
269 if (this.reactorConfig.getSndBufSize() > 0) {
270 socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
271 }
272 if (this.reactorConfig.getRcvBufSize() > 0) {
273 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
274 }
275 final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
276 if (linger >= 0) {
277 socket.setSoLinger(true, linger);
278 }
279 }
280
281 private void validateAddress(final SocketAddress address) throws UnknownHostException {
282 if (address instanceof InetSocketAddress) {
283 final InetSocketAddress endpoint = (InetSocketAddress) address;
284 if (endpoint.isUnresolved()) {
285 throw new UnknownHostException(endpoint.getHostName());
286 }
287 }
288 }
289
290 private void processPendingConnectionRequests() {
291 IOSessionRequest sessionRequest;
292 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
293 if (!sessionRequest.isCancelled()) {
294 final SocketChannel socketChannel;
295 try {
296 socketChannel = SocketChannel.open();
297 } catch (final IOException ex) {
298 sessionRequest.failed(ex);
299 return;
300 }
301 try {
302 processConnectionRequest(socketChannel, sessionRequest);
303 } catch (final IOException | SecurityException ex) {
304 Closer.closeQuietly(socketChannel);
305 sessionRequest.failed(ex);
306 }
307 }
308 }
309 }
310
311 private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
312 validateAddress(sessionRequest.localAddress);
313 validateAddress(sessionRequest.remoteAddress);
314
315 socketChannel.configureBlocking(false);
316 prepareSocket(socketChannel.socket());
317
318 if (sessionRequest.localAddress != null) {
319 final Socket sock = socketChannel.socket();
320 sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
321 sock.bind(sessionRequest.localAddress);
322 }
323
324 final SocketAddress targetAddress;
325 final IOEventHandlerFactory eventHandlerFactory;
326 if (this.reactorConfig.getSocksProxyAddress() != null) {
327 targetAddress = this.reactorConfig.getSocksProxyAddress();
328 eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
329 sessionRequest.remoteAddress,
330 this.reactorConfig.getSocksProxyUsername(),
331 this.reactorConfig.getSocksProxyPassword(),
332 this.eventHandlerFactory);
333 } else {
334 targetAddress = sessionRequest.remoteAddress;
335 eventHandlerFactory = this.eventHandlerFactory;
336 }
337
338
339
340 final boolean connected;
341 try {
342 connected = AccessController.doPrivileged(
343 new PrivilegedExceptionAction<Boolean>() {
344 @Override
345 public Boolean run() throws IOException {
346 return socketChannel.connect(targetAddress);
347 }
348 });
349 } catch (final PrivilegedActionException e) {
350 Asserts.check(e.getCause() instanceof IOException,
351 "method contract violation only checked exceptions are wrapped: " + e.getCause());
352
353 throw (IOException) e.getCause();
354 }
355
356
357 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
358 final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
359
360 @Override
361 public InternalDataChannel create(
362 final SelectionKey key,
363 final SocketChannel socketChannel,
364 final NamedEndpoint namedEndpoint,
365 final Object attachment) {
366 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
367 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
368 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession,
369 namedEndpoint,
370 sessionListener,
371 closedSessions);
372 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
373 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
374 return dataChannel;
375 }
376
377 });
378 if (connected) {
379 channel.handleIOEvent(SelectionKey.OP_CONNECT);
380 } else {
381 key.attach(channel);
382 sessionRequest.assign(channel);
383 }
384 }
385
386 private void closePendingChannels() {
387 SocketChannel socketChannel;
388 while ((socketChannel = this.channelQueue.poll()) != null) {
389 try {
390 socketChannel.close();
391 } catch (final IOException ex) {
392 logException(ex);
393 }
394 }
395 }
396
397 private void closePendingConnectionRequests() {
398 IOSessionRequest sessionRequest;
399 while ((sessionRequest = this.requestQueue.poll()) != null) {
400 sessionRequest.cancel();
401 }
402 }
403
404 }