1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.ConnectException;
23 import java.net.SocketAddress;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.Iterator;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.apache.mina.core.RuntimeIoException;
33 import org.apache.mina.core.filterchain.IoFilter;
34 import org.apache.mina.core.future.ConnectFuture;
35 import org.apache.mina.core.future.DefaultConnectFuture;
36 import org.apache.mina.core.service.AbstractIoConnector;
37 import org.apache.mina.core.service.AbstractIoService;
38 import org.apache.mina.core.service.IoConnector;
39 import org.apache.mina.core.service.IoHandler;
40 import org.apache.mina.core.service.IoProcessor;
41 import org.apache.mina.core.service.SimpleIoProcessorPool;
42 import org.apache.mina.core.session.AbstractIoSession;
43 import org.apache.mina.core.session.IoSession;
44 import org.apache.mina.core.session.IoSessionConfig;
45 import org.apache.mina.core.session.IoSessionInitializer;
46 import org.apache.mina.transport.socket.nio.NioSocketConnector;
47 import org.apache.mina.util.ExceptionMonitor;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class AbstractPollingIoConnector<S extends AbstractIoSession, H> extends AbstractIoConnector {
68
69 private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<>();
70
71 private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<>();
72
73 private final IoProcessor<S> processor;
74
75 private final boolean createdProcessor;
76
77 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
78
79 private volatile boolean selectable;
80
81
82 private final AtomicReference<Connector> connectorRef = new AtomicReference<>();
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
99 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
119 int processorCount) {
120 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
139 this(sessionConfig, null, processor, false);
140 }
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
162 this(sessionConfig, executor, processor, false);
163 }
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
188 boolean createdProcessor) {
189 super(sessionConfig, executor);
190
191 if (processor == null) {
192 throw new IllegalArgumentException("processor");
193 }
194
195 this.processor = processor;
196 this.createdProcessor = createdProcessor;
197
198 try {
199 init();
200 selectable = true;
201 } catch (RuntimeException e) {
202 throw e;
203 } catch (Exception e) {
204 throw new RuntimeIoException("Failed to initialize.", e);
205 } finally {
206 if (!selectable) {
207 try {
208 destroy();
209 } catch (Exception e) {
210 ExceptionMonitor.getInstance().exceptionCaught(e);
211 }
212 }
213 }
214 }
215
216
217
218
219
220
221
222 protected abstract void init() throws Exception;
223
224
225
226
227
228
229
230
231 protected abstract void destroy() throws Exception;
232
233
234
235
236
237
238
239
240
241
242 protected abstract H newHandle(SocketAddress localAddress) throws Exception;
243
244
245
246
247
248
249
250
251
252
253
254
255
256 protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
257
258
259
260
261
262
263
264
265
266
267
268
269 protected abstract boolean finishConnect(H handle) throws Exception;
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 protected abstract S newSession(IoProcessor<S> processor, H handle) throws Exception;
285
286
287
288
289
290
291
292
293
294 protected abstract void close(H handle) throws Exception;
295
296
297
298
299
300 protected abstract void wakeup();
301
302
303
304
305
306
307
308
309
310
311 protected abstract int select(int timeout) throws Exception;
312
313
314
315
316
317
318
319 protected abstract Iterator<H> selectedHandles();
320
321
322
323
324
325
326 protected abstract Iterator<H> allHandles();
327
328
329
330
331
332
333
334
335
336
337
338 protected abstract void register(H handle, ConnectionRequest request) throws Exception;
339
340
341
342
343
344
345
346
347
348 protected abstract ConnectionRequest getConnectionRequest(H handle);
349
350
351
352
353 @Override
354 protected final void dispose0() throws Exception {
355 startupWorker();
356 wakeup();
357 }
358
359
360
361
362 @Override
363 @SuppressWarnings("unchecked")
364 protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
365 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
366 H handle = null;
367 boolean success = false;
368 try {
369 handle = newHandle(localAddress);
370 if (connect(handle, remoteAddress)) {
371 ConnectFuture future = new DefaultConnectFuture();
372 S session = newSession(processor, handle);
373 initSession(session, future, sessionInitializer);
374
375 session.getProcessor().add(session);
376 success = true;
377 return future;
378 }
379
380 success = true;
381 } catch (Exception e) {
382 return DefaultConnectFuture.newFailedFuture(e);
383 } finally {
384 if (!success && handle != null) {
385 try {
386 close(handle);
387 } catch (Exception e) {
388 ExceptionMonitor.getInstance().exceptionCaught(e);
389 }
390 }
391 }
392
393 ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
394 connectQueue.add(request);
395 startupWorker();
396 wakeup();
397
398 return request;
399 }
400
401 private void startupWorker() {
402 if (!selectable) {
403 connectQueue.clear();
404 cancelQueue.clear();
405 }
406
407 Connector connector = connectorRef.get();
408
409 if (connector == null) {
410 connector = new Connector();
411
412 if (connectorRef.compareAndSet(null, connector)) {
413 executeWorker(connector);
414 }
415 }
416 }
417
418 private class Connector implements Runnable {
419
420
421
422 @Override
423 public void run() {
424 assert connectorRef.get() == this;
425
426 int nHandles = 0;
427
428 while (selectable) {
429 try {
430
431
432 int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
433 int selected = select(timeout);
434
435 nHandles += registerNew();
436
437
438
439 if (nHandles == 0) {
440 connectorRef.set(null);
441
442 if (connectQueue.isEmpty()) {
443 assert connectorRef.get() != this;
444 break;
445 }
446
447 if (!connectorRef.compareAndSet(null, this)) {
448 assert connectorRef.get() != this;
449 break;
450 }
451
452 assert connectorRef.get() == this;
453 }
454
455 if (selected > 0) {
456 nHandles -= processConnections(selectedHandles());
457 }
458
459 processTimedOutSessions(allHandles());
460
461 nHandles -= cancelKeys();
462 } catch (ClosedSelectorException cse) {
463
464 ExceptionMonitor.getInstance().exceptionCaught(cse);
465 break;
466 } catch (Exception e) {
467 ExceptionMonitor.getInstance().exceptionCaught(e);
468
469 try {
470 Thread.sleep(1000);
471 } catch (InterruptedException e1) {
472 ExceptionMonitor.getInstance().exceptionCaught(e1);
473 }
474 }
475 }
476
477 if (selectable && isDisposing()) {
478 selectable = false;
479 try {
480 if (createdProcessor) {
481 processor.dispose();
482 }
483 } finally {
484 try {
485 synchronized (disposalLock) {
486 if (isDisposing()) {
487 destroy();
488 }
489 }
490 } catch (Exception e) {
491 ExceptionMonitor.getInstance().exceptionCaught(e);
492 } finally {
493 disposalFuture.setDone();
494 }
495 }
496 }
497 }
498
499 private int registerNew() {
500 int nHandles = 0;
501 for (;;) {
502 ConnectionRequest req = connectQueue.poll();
503 if (req == null) {
504 break;
505 }
506
507 H handle = req.handle;
508 try {
509 register(handle, req);
510 nHandles++;
511 } catch (Exception e) {
512 req.setException(e);
513 try {
514 close(handle);
515 } catch (Exception e2) {
516 ExceptionMonitor.getInstance().exceptionCaught(e2);
517 }
518 }
519 }
520 return nHandles;
521 }
522
523 private int cancelKeys() {
524 int nHandles = 0;
525
526 for (;;) {
527 ConnectionRequest req = cancelQueue.poll();
528
529 if (req == null) {
530 break;
531 }
532
533 H handle = req.handle;
534
535 try {
536 close(handle);
537 } catch (Exception e) {
538 ExceptionMonitor.getInstance().exceptionCaught(e);
539 } finally {
540 nHandles++;
541 }
542 }
543
544 if (nHandles > 0) {
545 wakeup();
546 }
547
548 return nHandles;
549 }
550
551
552
553
554
555 private int processConnections(Iterator<H> handlers) {
556 int nHandles = 0;
557
558
559 while (handlers.hasNext()) {
560 H handle = handlers.next();
561 handlers.remove();
562
563 ConnectionRequest connectionRequest = getConnectionRequest(handle);
564
565 if (connectionRequest == null) {
566 continue;
567 }
568
569 boolean success = false;
570 try {
571 if (finishConnect(handle)) {
572 S session = newSession(processor, handle);
573 initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
574
575 session.getProcessor().add(session);
576 nHandles++;
577 }
578 success = true;
579 } catch (Exception e) {
580 connectionRequest.setException(e);
581 } finally {
582 if (!success) {
583
584 cancelQueue.offer(connectionRequest);
585 }
586 }
587 }
588 return nHandles;
589 }
590
591 private void processTimedOutSessions(Iterator<H> handles) {
592 long currentTime = System.currentTimeMillis();
593
594 while (handles.hasNext()) {
595 H handle = handles.next();
596 ConnectionRequest connectionRequest = getConnectionRequest(handle);
597
598 if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
599 connectionRequest.setException(new ConnectException("Connection timed out."));
600 cancelQueue.offer(connectionRequest);
601 }
602 }
603 }
604 }
605
606
607
608
609 public final class ConnectionRequest extends DefaultConnectFuture {
610
611 private final H handle;
612
613
614 private final long deadline;
615
616
617 private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
618
619
620
621
622
623
624
625 public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
626 this.handle = handle;
627 long timeout = getConnectTimeoutMillis();
628
629 if (timeout <= 0L) {
630 this.deadline = Long.MAX_VALUE;
631 } else {
632 this.deadline = System.currentTimeMillis() + timeout;
633 }
634
635 this.sessionInitializer = callback;
636 }
637
638
639
640
641 public H getHandle() {
642 return handle;
643 }
644
645
646
647
648 public long getDeadline() {
649 return deadline;
650 }
651
652
653
654
655 public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
656 return sessionInitializer;
657 }
658
659
660
661
662 @Override
663 public boolean cancel() {
664 if (!isDone()) {
665 boolean justCancelled = super.cancel();
666
667
668
669 if (justCancelled) {
670 cancelQueue.add(this);
671 startupWorker();
672 wakeup();
673 }
674 }
675
676 return true;
677 }
678 }
679 }