1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.SocketChannel;
38 import java.security.AccessController;
39 import java.security.PrivilegedActionException;
40 import java.security.PrivilegedExceptionAction;
41 import java.util.Queue;
42 import java.util.Set;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.ThreadFactory;
45
46 import org.apache.http.nio.reactor.ConnectingIOReactor;
47 import org.apache.http.nio.reactor.IOReactorException;
48 import org.apache.http.nio.reactor.IOReactorStatus;
49 import org.apache.http.nio.reactor.SessionRequest;
50 import org.apache.http.nio.reactor.SessionRequestCallback;
51 import org.apache.http.params.HttpParams;
52 import org.apache.http.util.Asserts;
53
54
55
56
57
58
59
60
61 @SuppressWarnings("deprecation")
62 public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
63 implements ConnectingIOReactor {
64
65 private final Queue<SessionRequestImpl> requestQueue;
66
67 private long lastTimeoutCheck;
68
69
70
71
72
73
74
75
76
77
78
79 public DefaultConnectingIOReactor(
80 final IOReactorConfig config,
81 final ThreadFactory threadFactory) throws IOReactorException {
82 super(config, threadFactory);
83 this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
84 this.lastTimeoutCheck = System.currentTimeMillis();
85 }
86
87
88
89
90
91
92
93
94
95
96 public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
97 this(config, null);
98 }
99
100
101
102
103
104
105
106
107 public DefaultConnectingIOReactor() throws IOReactorException {
108 this(null, null);
109 }
110
111
112
113
114 @Deprecated
115 public DefaultConnectingIOReactor(
116 final int workerCount,
117 final ThreadFactory threadFactory,
118 final HttpParams params) throws IOReactorException {
119 this(convert(workerCount, params), threadFactory);
120 }
121
122
123
124
125 @Deprecated
126 public DefaultConnectingIOReactor(
127 final int workerCount,
128 final HttpParams params) throws IOReactorException {
129 this(convert(workerCount, params), null);
130 }
131
132 @Override
133 protected void cancelRequests() throws IOReactorException {
134 SessionRequestImpl request;
135 while ((request = this.requestQueue.poll()) != null) {
136 request.cancel();
137 }
138 }
139
140 @Override
141 protected void processEvents(final int readyCount) throws IOReactorException {
142 processSessionRequests();
143
144 if (readyCount > 0) {
145 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
146 for (final SelectionKey key : selectedKeys) {
147
148 processEvent(key);
149
150 }
151 selectedKeys.clear();
152 }
153
154 final long currentTime = System.currentTimeMillis();
155 if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
156 this.lastTimeoutCheck = currentTime;
157 final Set<SelectionKey> keys = this.selector.keys();
158 processTimeouts(keys);
159 }
160 }
161
162 private void processEvent(final SelectionKey key) {
163 try {
164
165 if (key.isConnectable()) {
166
167 final SocketChannel channel = (SocketChannel) key.channel();
168
169 final SessionRequestHandle/apache/http/impl/nio/reactor/SessionRequestHandle.html#SessionRequestHandle">SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
170 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
171
172
173 try {
174 channel.finishConnect();
175 } catch (final IOException ex) {
176 sessionRequest.failed(ex);
177 }
178 key.cancel();
179 key.attach(null);
180 if (!sessionRequest.isCompleted()) {
181 addChannel(new ChannelEntry(channel, sessionRequest));
182 } else {
183 try {
184 channel.close();
185 } catch (final IOException ignore) {
186 }
187 }
188 }
189
190 } catch (final CancelledKeyException ex) {
191 final SessionRequestHandle/apache/http/impl/nio/reactor/SessionRequestHandle.html#SessionRequestHandle">SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
192 key.attach(null);
193 if (requestHandle != null) {
194 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
195 if (sessionRequest != null) {
196 sessionRequest.cancel();
197 }
198 }
199 }
200 }
201
202 private void processTimeouts(final Set<SelectionKey> keys) {
203 final long now = System.currentTimeMillis();
204 for (final SelectionKey key : keys) {
205 final Object attachment = key.attachment();
206
207 if (attachment instanceof SessionRequestHandle) {
208 final SessionRequestHandle/../org/apache/http/impl/nio/reactor/SessionRequestHandle.html#SessionRequestHandle">SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
209 final SessionRequestImpl sessionRequest = handle.getSessionRequest();
210 final int timeout = sessionRequest.getConnectTimeout();
211 if (timeout > 0) {
212 if (handle.getRequestTime() + timeout < now) {
213 sessionRequest.timeout();
214 }
215 }
216 }
217
218 }
219 }
220
221 @Override
222 public SessionRequest connect(
223 final SocketAddress remoteAddress,
224 final SocketAddress localAddress,
225 final Object attachment,
226 final SessionRequestCallback callback) {
227 Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
228 "I/O reactor has been shut down");
229 final SessionRequestImplquestImpl.html#SessionRequestImpl">SessionRequestImpl sessionRequest = new SessionRequestImpl(
230 remoteAddress, localAddress, attachment, callback);
231 sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
232
233 this.requestQueue.add(sessionRequest);
234 this.selector.wakeup();
235
236 return sessionRequest;
237 }
238
239 private void validateAddress(final SocketAddress address) throws UnknownHostException {
240 if (address == null) {
241 return;
242 }
243 if (address instanceof InetSocketAddress) {
244 final InetSocketAddress endpoint = (InetSocketAddress) address;
245 if (endpoint.isUnresolved()) {
246 throw new UnknownHostException(endpoint.getHostName());
247 }
248 }
249 }
250
251 private void processSessionRequests() throws IOReactorException {
252 SessionRequestImpl request;
253 while ((request = this.requestQueue.poll()) != null) {
254 if (request.isCompleted()) {
255 continue;
256 }
257 final SocketChannel socketChannel;
258 try {
259 socketChannel = SocketChannel.open();
260 } catch (final IOException ex) {
261 request.failed(ex);
262 return;
263 }
264 try {
265 validateAddress(request.getLocalAddress());
266 validateAddress(request.getRemoteAddress());
267
268 socketChannel.configureBlocking(false);
269 prepareSocket(socketChannel.socket());
270
271 if (request.getLocalAddress() != null) {
272 final Socket sock = socketChannel.socket();
273 sock.setReuseAddress(this.config.isSoReuseAddress());
274 sock.bind(request.getLocalAddress());
275 }
276
277 final SocketAddress targetAddress = request.getRemoteAddress();
278
279
280 final boolean connected;
281 try {
282 connected = AccessController.doPrivileged(
283 new PrivilegedExceptionAction<Boolean>() {
284 @Override
285 public Boolean run() throws IOException {
286 return socketChannel.connect(targetAddress);
287 }
288 });
289 } catch (final PrivilegedActionException e) {
290 Asserts.check(e.getCause() instanceof IOException,
291 "method contract violation only checked exceptions are wrapped: " + e.getCause());
292
293 throw (IOException) e.getCause();
294 }
295 if (connected) {
296 final ChannelEntryactor/ChannelEntry.html#ChannelEntry">ChannelEntry entry = new ChannelEntry(socketChannel, request);
297 addChannel(entry);
298 continue;
299 }
300 } catch (final IOException ex) {
301 closeChannel(socketChannel);
302 request.failed(ex);
303 return;
304 } catch (final SecurityException ex) {
305 closeChannel(socketChannel);
306 request.failed(new IOException(ex));
307 return;
308 }
309
310 final SessionRequestHandleuestHandle.html#SessionRequestHandle">SessionRequestHandle requestHandle = new SessionRequestHandle(request);
311 try {
312 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
313 requestHandle);
314 request.setKey(key);
315 } catch (final IOException ex) {
316 closeChannel(socketChannel);
317 throw new IOReactorException("Failure registering channel " +
318 "with the selector", ex);
319 }
320 }
321 }
322
323 }