1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.ConnectException;
23 import java.net.SocketAddress;
24 import java.util.Iterator;
25 import java.util.Queue;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.Executors;
29
30 import org.apache.mina.core.RuntimeIoException;
31 import org.apache.mina.core.filterchain.IoFilter;
32 import org.apache.mina.core.future.ConnectFuture;
33 import org.apache.mina.core.future.DefaultConnectFuture;
34 import org.apache.mina.core.future.IoFuture;
35 import org.apache.mina.core.service.AbstractIoConnector;
36 import org.apache.mina.core.service.IoConnector;
37 import org.apache.mina.core.service.IoHandler;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.service.SimpleIoProcessorPool;
40 import org.apache.mina.core.session.AbstractIoSession;
41 import org.apache.mina.core.session.IoSession;
42 import org.apache.mina.core.session.IoSessionConfig;
43 import org.apache.mina.core.session.IoSessionInitializer;
44 import org.apache.mina.util.ExceptionMonitor;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
65 extends AbstractIoConnector {
66
67 private final Object lock = new Object();
68 private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
69 private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
70 private final IoProcessor<T> processor;
71 private final boolean createdProcessor;
72
73 private final ServiceOperationFuture disposalFuture =
74 new ServiceOperationFuture();
75 private volatile boolean selectable;
76 private Worker worker;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
92 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
110 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
126 this(sessionConfig, null, processor, false);
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
146 this(sessionConfig, executor, processor, false);
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
167 super(sessionConfig, executor);
168
169 if (processor == null) {
170 throw new NullPointerException("processor");
171 }
172
173 this.processor = processor;
174 this.createdProcessor = createdProcessor;
175
176 try {
177 init();
178 selectable = true;
179 } catch (RuntimeException e){
180 throw e;
181 } catch (Exception e) {
182 throw new RuntimeIoException("Failed to initialize.", e);
183 } finally {
184 if (!selectable) {
185 try {
186 destroy();
187 } catch (Exception e) {
188 ExceptionMonitor.getInstance().exceptionCaught(e);
189 }
190 }
191 }
192 }
193
194
195
196
197
198 protected abstract void init() throws Exception;
199
200
201
202
203
204
205 protected abstract void destroy() throws Exception;
206
207
208
209
210
211
212
213 protected abstract H newHandle(SocketAddress localAddress) throws Exception;
214
215
216
217
218
219
220
221
222
223
224
225 protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
226
227
228
229
230
231
232
233
234
235 protected abstract boolean finishConnect(H handle) throws Exception;
236
237
238
239
240
241
242
243
244
245
246 protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
247
248
249
250
251
252
253 protected abstract void close(H handle) throws Exception;
254
255
256
257
258 protected abstract void wakeup();
259
260
261
262
263
264
265
266
267 protected abstract boolean select(int timeout) throws Exception;
268
269
270
271
272
273
274 protected abstract Iterator<H> selectedHandles();
275
276
277
278
279
280 protected abstract Iterator<H> allHandles();
281
282
283
284
285
286
287
288 protected abstract void register(H handle, ConnectionRequest request) throws Exception;
289
290
291
292
293
294
295 protected abstract ConnectionRequest connectionRequest(H handle);
296
297
298
299
300 @Override
301 protected final IoFuture dispose0() throws Exception {
302 if (!disposalFuture.isDone()) {
303 startupWorker();
304 wakeup();
305 }
306 return disposalFuture;
307 }
308
309
310
311
312 @Override
313 @SuppressWarnings("unchecked")
314 protected final ConnectFuture connect0(
315 SocketAddress remoteAddress, SocketAddress localAddress,
316 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
317 H handle = null;
318 boolean success = false;
319 try {
320 handle = newHandle(localAddress);
321 if (connect(handle, remoteAddress)) {
322 ConnectFuture future = new DefaultConnectFuture();
323 T session = newSession(processor, handle);
324 finishSessionInitialization(session, future, sessionInitializer);
325
326 session.getProcessor().add(session);
327 success = true;
328 return future;
329 }
330
331 success = true;
332 } catch (Exception e) {
333 return DefaultConnectFuture.newFailedFuture(e);
334 } finally {
335 if (!success && handle != null) {
336 try {
337 close(handle);
338 } catch (Exception e) {
339 ExceptionMonitor.getInstance().exceptionCaught(e);
340 }
341 }
342 }
343
344 ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
345 connectQueue.add(request);
346 startupWorker();
347 wakeup();
348
349 return request;
350 }
351
352 private void startupWorker() {
353 if (!selectable) {
354 connectQueue.clear();
355 cancelQueue.clear();
356 }
357
358 synchronized (lock) {
359 if (worker == null) {
360 worker = new Worker();
361 executeWorker(worker);
362 }
363 }
364 }
365
366 private int registerNew() {
367 int nHandles = 0;
368 for (; ;) {
369 ConnectionRequest req = connectQueue.poll();
370 if (req == null) {
371 break;
372 }
373
374 H handle = req.handle;
375 try {
376 register(handle, req);
377 nHandles ++;
378 } catch (Exception e) {
379 req.setException(e);
380 try {
381 close(handle);
382 } catch (Exception e2) {
383 ExceptionMonitor.getInstance().exceptionCaught(e2);
384 }
385 }
386 }
387 return nHandles;
388 }
389
390 private int cancelKeys() {
391 int nHandles = 0;
392 for (; ;) {
393 ConnectionRequest req = cancelQueue.poll();
394 if (req == null) {
395 break;
396 }
397
398 H handle = req.handle;
399 try {
400 close(handle);
401 } catch (Exception e) {
402 ExceptionMonitor.getInstance().exceptionCaught(e);
403 } finally {
404 nHandles ++;
405 }
406 }
407 return nHandles;
408 }
409
410 @SuppressWarnings("unchecked")
411 private int processSessions(Iterator<H> handlers) {
412 int nHandles = 0;
413 while (handlers.hasNext()) {
414 H handle = handlers.next();
415 handlers.remove();
416
417 ConnectionRequest entry = connectionRequest(handle);
418 boolean success = false;
419 try {
420 if (finishConnect(handle)) {
421 T session = newSession(processor, handle);
422 finishSessionInitialization(session, entry, entry.getSessionInitializer());
423
424 session.getProcessor().add(session);
425 nHandles ++;
426 }
427 success = true;
428 } catch (Throwable e) {
429 entry.setException(e);
430 } finally {
431 if (!success) {
432 cancelQueue.offer(entry);
433 }
434 }
435 }
436 return nHandles;
437 }
438
439 private void processTimedOutSessions(Iterator<H> handles) {
440 long currentTime = System.currentTimeMillis();
441
442 while (handles.hasNext()) {
443 H handle = handles.next();
444 ConnectionRequest entry = connectionRequest(handle);
445
446 if (currentTime >= entry.deadline) {
447 entry.setException(
448 new ConnectException("Connection timed out."));
449 cancelQueue.offer(entry);
450 }
451 }
452 }
453
454 private class Worker implements Runnable {
455
456 public void run() {
457 int nHandles = 0;
458 while (selectable) {
459 try {
460
461
462 int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);
463 boolean selected = select(timeout);
464
465 nHandles += registerNew();
466
467 if (selected) {
468 nHandles -= processSessions(selectedHandles());
469 }
470
471 processTimedOutSessions(allHandles());
472
473 nHandles -= cancelKeys();
474
475 if (nHandles == 0) {
476 synchronized (lock) {
477 if (connectQueue.isEmpty()) {
478 worker = null;
479 break;
480 }
481 }
482 }
483 } catch (Throwable e) {
484 ExceptionMonitor.getInstance().exceptionCaught(e);
485
486 try {
487 Thread.sleep(1000);
488 } catch (InterruptedException e1) {
489 ExceptionMonitor.getInstance().exceptionCaught(e1);
490 }
491 }
492 }
493
494 if (selectable && isDisposing()) {
495 selectable = false;
496 try {
497 if (createdProcessor) {
498 processor.dispose();
499 }
500 } finally {
501 try {
502 synchronized (disposalLock) {
503 if (isDisposing()) {
504 destroy();
505 }
506 }
507 } catch (Exception e) {
508 ExceptionMonitor.getInstance().exceptionCaught(e);
509 } finally {
510 disposalFuture.setDone();
511 }
512 }
513 }
514 }
515 }
516
517 public final class ConnectionRequest extends DefaultConnectFuture {
518 private final H handle;
519 private final long deadline;
520 private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
521
522 public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
523 this.handle = handle;
524 long timeout = getConnectTimeoutMillis();
525 if (timeout <= 0L) {
526 this.deadline = Long.MAX_VALUE;
527 } else {
528 this.deadline = System.currentTimeMillis() + timeout;
529 }
530 this.sessionInitializer = callback;
531 }
532
533 public H getHandle() {
534 return handle;
535 }
536
537 public long getDeadline() {
538 return deadline;
539 }
540
541 public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
542 return sessionInitializer;
543 }
544
545 @Override
546 public void cancel() {
547 super.cancel();
548 cancelQueue.add(this);
549 startupWorker();
550 wakeup();
551 }
552 }
553 }