1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedSelectorException;
24 import java.nio.channels.spi.SelectorProvider;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Semaphore;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.mina.core.RuntimeIoException;
41 import org.apache.mina.core.filterchain.IoFilter;
42 import org.apache.mina.core.service.AbstractIoAcceptor;
43 import org.apache.mina.core.service.AbstractIoService;
44 import org.apache.mina.core.service.IoAcceptor;
45 import org.apache.mina.core.service.IoHandler;
46 import org.apache.mina.core.service.IoProcessor;
47 import org.apache.mina.core.service.SimpleIoProcessorPool;
48 import org.apache.mina.core.session.AbstractIoSession;
49 import org.apache.mina.core.session.IoSession;
50 import org.apache.mina.core.session.IoSessionConfig;
51 import org.apache.mina.transport.socket.SocketSessionConfig;
52 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
53 import org.apache.mina.util.ExceptionMonitor;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
72
73 private final Semaphore lock = new Semaphore(1);
74
75 private final IoProcessor<S> processor;
76
77 private final boolean createdProcessor;
78
79 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
80
81 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
82
83 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
84
85 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
86
87
88 private volatile boolean selectable;
89
90
91 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
92
93 protected boolean reuseAddress = false;
94
95
96
97
98
99 protected int backlog = 50;
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
115 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
133 int processorCount) {
134 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
135 }
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
153 int processorCount, SelectorProvider selectorProvider ) {
154 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
170 this(sessionConfig, null, processor, false, null);
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
193 this(sessionConfig, executor, processor, false, null);
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
219 boolean createdProcessor, SelectorProvider selectorProvider) {
220 super(sessionConfig, executor);
221
222 if (processor == null) {
223 throw new IllegalArgumentException("processor");
224 }
225
226 this.processor = processor;
227 this.createdProcessor = createdProcessor;
228
229 try {
230
231 init(selectorProvider);
232
233
234
235 selectable = true;
236 } catch (RuntimeException e) {
237 throw e;
238 } catch (Exception e) {
239 throw new RuntimeIoException("Failed to initialize.", e);
240 } finally {
241 if (!selectable) {
242 try {
243 destroy();
244 } catch (Exception e) {
245 ExceptionMonitor.getInstance().exceptionCaught(e);
246 }
247 }
248 }
249 }
250
251
252
253
254
255 protected abstract void init() throws Exception;
256
257
258
259
260
261
262
263 protected abstract void init(SelectorProvider selectorProvider) throws Exception;
264
265
266
267
268
269
270 protected abstract void destroy() throws Exception;
271
272
273
274
275
276
277
278 protected abstract int select() throws Exception;
279
280
281
282
283 protected abstract void wakeup();
284
285
286
287
288
289
290 protected abstract Iterator<H> selectedHandles();
291
292
293
294
295
296
297
298 protected abstract H open(SocketAddress localAddress) throws Exception;
299
300
301
302
303
304
305
306 protected abstract SocketAddress localAddress(H handle) throws Exception;
307
308
309
310
311
312
313
314
315
316 protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
317
318
319
320
321
322
323 protected abstract void close(H handle) throws Exception;
324
325
326
327
328 @Override
329 protected void dispose0() throws Exception {
330 unbind();
331
332 startupAcceptor();
333 wakeup();
334 }
335
336
337
338
339 @Override
340 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
341
342
343 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
344
345
346
347 registerQueue.add(request);
348
349
350
351 startupAcceptor();
352
353
354
355
356 try {
357 lock.acquire();
358
359
360 Thread.sleep(10);
361 wakeup();
362 } finally {
363 lock.release();
364 }
365
366
367 request.awaitUninterruptibly();
368
369 if (request.getException() != null) {
370 throw request.getException();
371 }
372
373
374
375
376 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
377
378 for (H handle : boundHandles.values()) {
379 newLocalAddresses.add(localAddress(handle));
380 }
381
382 return newLocalAddresses;
383 }
384
385
386
387
388
389
390
391
392
393 private void startupAcceptor() throws InterruptedException {
394
395
396 if (!selectable) {
397 registerQueue.clear();
398 cancelQueue.clear();
399 }
400
401
402 Acceptor acceptor = acceptorRef.get();
403
404 if (acceptor == null) {
405 lock.acquire();
406 acceptor = new Acceptor();
407
408 if (acceptorRef.compareAndSet(null, acceptor)) {
409 executeWorker(acceptor);
410 } else {
411 lock.release();
412 }
413 }
414 }
415
416
417
418
419 @Override
420 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
421 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
422
423 cancelQueue.add(future);
424 startupAcceptor();
425 wakeup();
426
427 future.awaitUninterruptibly();
428 if (future.getException() != null) {
429 throw future.getException();
430 }
431 }
432
433
434
435
436
437
438
439 private class Acceptor implements Runnable {
440 public void run() {
441 assert (acceptorRef.get() == this);
442
443 int nHandles = 0;
444
445
446 lock.release();
447
448 while (selectable) {
449 try {
450
451
452
453
454 int selected = select();
455
456
457
458
459 nHandles += registerHandles();
460
461
462
463
464 if (nHandles == 0) {
465 acceptorRef.set(null);
466
467 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
468 assert (acceptorRef.get() != this);
469 break;
470 }
471
472 if (!acceptorRef.compareAndSet(null, this)) {
473 assert (acceptorRef.get() != this);
474 break;
475 }
476
477 assert (acceptorRef.get() == this);
478 }
479
480 if (selected > 0) {
481
482
483 processHandles(selectedHandles());
484 }
485
486
487 nHandles -= unregisterHandles();
488 } catch (ClosedSelectorException cse) {
489
490 ExceptionMonitor.getInstance().exceptionCaught(cse);
491 break;
492 } catch (Exception e) {
493 ExceptionMonitor.getInstance().exceptionCaught(e);
494
495 try {
496 Thread.sleep(1000);
497 } catch (InterruptedException e1) {
498 ExceptionMonitor.getInstance().exceptionCaught(e1);
499 }
500 }
501 }
502
503
504 if (selectable && isDisposing()) {
505 selectable = false;
506 try {
507 if (createdProcessor) {
508 processor.dispose();
509 }
510 } finally {
511 try {
512 synchronized (disposalLock) {
513 if (isDisposing()) {
514 destroy();
515 }
516 }
517 } catch (Exception e) {
518 ExceptionMonitor.getInstance().exceptionCaught(e);
519 } finally {
520 disposalFuture.setDone();
521 }
522 }
523 }
524 }
525
526
527
528
529
530
531
532
533
534
535 @SuppressWarnings("unchecked")
536 private void processHandles(Iterator<H> handles) throws Exception {
537 while (handles.hasNext()) {
538 H handle = handles.next();
539 handles.remove();
540
541
542
543 S session = accept(processor, handle);
544
545 if (session == null) {
546 continue;
547 }
548
549 initSession(session, null, null);
550
551
552 session.getProcessor().add(session);
553 }
554 }
555 }
556
557
558
559
560
561
562
563
564
565
566 private int registerHandles() {
567 for (;;) {
568
569
570 AcceptorOperationFuture future = registerQueue.poll();
571
572 if (future == null) {
573 return 0;
574 }
575
576
577
578
579 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
580 List<SocketAddress> localAddresses = future.getLocalAddresses();
581
582 try {
583
584 for (SocketAddress a : localAddresses) {
585 H handle = open(a);
586 newHandles.put(localAddress(handle), handle);
587 }
588
589
590
591 boundHandles.putAll(newHandles);
592
593
594 future.setDone();
595 return newHandles.size();
596 } catch (Exception e) {
597
598 future.setException(e);
599 } finally {
600
601 if (future.getException() != null) {
602 for (H handle : newHandles.values()) {
603 try {
604 close(handle);
605 } catch (Exception e) {
606 ExceptionMonitor.getInstance().exceptionCaught(e);
607 }
608 }
609
610
611 wakeup();
612 }
613 }
614 }
615 }
616
617
618
619
620
621
622
623 private int unregisterHandles() {
624 int cancelledHandles = 0;
625 for (;;) {
626 AcceptorOperationFuture future = cancelQueue.poll();
627 if (future == null) {
628 break;
629 }
630
631
632 for (SocketAddress a : future.getLocalAddresses()) {
633 H handle = boundHandles.remove(a);
634
635 if (handle == null) {
636 continue;
637 }
638
639 try {
640 close(handle);
641 wakeup();
642 } catch (Exception e) {
643 ExceptionMonitor.getInstance().exceptionCaught(e);
644 } finally {
645 cancelledHandles++;
646 }
647 }
648
649 future.setDone();
650 }
651
652 return cancelledHandles;
653 }
654
655
656
657
658 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
659 throw new UnsupportedOperationException();
660 }
661
662
663
664
665 public int getBacklog() {
666 return backlog;
667 }
668
669
670
671
672
673
674
675 public void setBacklog(int backlog) {
676 synchronized (bindLock) {
677 if (isActive()) {
678 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
679 }
680
681 this.backlog = backlog;
682 }
683 }
684
685
686
687
688 public boolean isReuseAddress() {
689 return reuseAddress;
690 }
691
692
693
694
695
696
697
698 public void setReuseAddress(boolean reuseAddress) {
699 synchronized (bindLock) {
700 if (isActive()) {
701 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
702 }
703
704 this.reuseAddress = reuseAddress;
705 }
706 }
707
708
709
710
711 public SocketSessionConfig getSessionConfig() {
712 return (SocketSessionConfig)sessionConfig;
713 }
714 }