1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Executors;
34
35 import org.apache.mina.core.RuntimeIoException;
36 import org.apache.mina.core.filterchain.IoFilter;
37 import org.apache.mina.core.future.IoFuture;
38 import org.apache.mina.core.service.AbstractIoAcceptor;
39 import org.apache.mina.core.service.IoAcceptor;
40 import org.apache.mina.core.service.IoHandler;
41 import org.apache.mina.core.service.IoProcessor;
42 import org.apache.mina.core.service.SimpleIoProcessorPool;
43 import org.apache.mina.core.session.AbstractIoSession;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47 import org.apache.mina.util.ExceptionMonitor;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
68 extends AbstractIoAcceptor {
69
70 private final IoProcessor<T> processor;
71
72 private final boolean createdProcessor;
73
74 private final Object lock = new Object();
75
76 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
77
78 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
79
80 private final Map<SocketAddress, H> boundHandles = Collections
81 .synchronizedMap(new HashMap<SocketAddress, H>());
82
83 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
84
85 private volatile boolean selectable;
86
87 private Worker worker;
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
103 Class<? extends IoProcessor<T>> processorClass) {
104 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
105 true);
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
123 Class<? extends IoProcessor<T>> processorClass, int processorCount) {
124 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass,
125 processorCount), true);
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
141 IoProcessor<T> processor) {
142 this(sessionConfig, null, processor, false);
143 }
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
162 Executor executor, IoProcessor<T> processor) {
163 this(sessionConfig, executor, processor, false);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
184 Executor executor, IoProcessor<T> processor,
185 boolean createdProcessor) {
186 super(sessionConfig, executor);
187
188 if (processor == null) {
189 throw new NullPointerException("processor");
190 }
191
192 this.processor = processor;
193 this.createdProcessor = createdProcessor;
194
195 try {
196 init();
197 selectable = true;
198 } catch (RuntimeException e) {
199 throw e;
200 } catch (Exception e) {
201 throw new RuntimeIoException("Failed to initialize.", e);
202 } finally {
203 if (!selectable) {
204 try {
205 destroy();
206 } catch (Exception e) {
207 ExceptionMonitor.getInstance().exceptionCaught(e);
208 }
209 }
210 }
211 }
212
213
214
215
216
217 protected abstract void init() throws Exception;
218
219
220
221
222
223
224 protected abstract void destroy() throws Exception;
225
226
227
228
229
230
231
232 protected abstract boolean select() throws Exception;
233
234
235
236
237 protected abstract void wakeup();
238
239
240
241
242
243
244 protected abstract Iterator<H> selectedHandles();
245
246
247
248
249
250
251
252 protected abstract H open(SocketAddress localAddress) throws Exception;
253
254
255
256
257
258
259
260 protected abstract SocketAddress localAddress(H handle) throws Exception;
261
262
263
264
265
266
267
268
269
270 protected abstract T accept(IoProcessor<T> processor, H handle)
271 throws Exception;
272
273
274
275
276
277
278 protected abstract void close(H handle) throws Exception;
279
280
281
282
283 @Override
284 protected IoFuture dispose0() throws Exception {
285 unbind();
286 if (!disposalFuture.isDone()) {
287 startupWorker();
288 wakeup();
289 }
290 return disposalFuture;
291 }
292
293
294
295
296 @Override
297 protected final Set<SocketAddress> bind0(
298 List<? extends SocketAddress> localAddresses) throws Exception {
299 AcceptorOperationFuture request = new AcceptorOperationFuture(
300 localAddresses);
301
302
303
304 registerQueue.add(request);
305
306
307
308 startupWorker();
309 wakeup();
310 request.awaitUninterruptibly();
311
312 if (request.getException() != null) {
313 throw request.getException();
314 }
315
316
317
318
319 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
320 for (H handle : boundHandles.values()) {
321 newLocalAddresses.add(localAddress(handle));
322 }
323
324 return newLocalAddresses;
325 }
326
327
328
329
330
331
332
333
334
335
336 private void startupWorker() {
337 if (!selectable) {
338 registerQueue.clear();
339 cancelQueue.clear();
340 }
341
342 synchronized (lock) {
343 if (worker == null) {
344 worker = new Worker();
345 executeWorker(worker);
346 }
347 }
348 }
349
350
351
352
353 @Override
354 protected final void unbind0(List<? extends SocketAddress> localAddresses)
355 throws Exception {
356 AcceptorOperationFuture future = new AcceptorOperationFuture(
357 localAddresses);
358
359 cancelQueue.add(future);
360 startupWorker();
361 wakeup();
362
363 future.awaitUninterruptibly();
364 if (future.getException() != null) {
365 throw future.getException();
366 }
367 }
368
369
370
371
372
373 private class Worker implements Runnable {
374 public void run() {
375 int nHandles = 0;
376
377 while (selectable) {
378 try {
379
380 boolean selected = select();
381
382
383
384
385 nHandles += registerHandles();
386
387 if (selected) {
388 processHandles(selectedHandles());
389 }
390
391
392 nHandles -= unregisterHandles();
393
394 if (nHandles == 0) {
395 synchronized (lock) {
396 if (registerQueue.isEmpty()
397 && cancelQueue.isEmpty()) {
398 worker = null;
399 break;
400 }
401 }
402 }
403 } catch (Throwable e) {
404 ExceptionMonitor.getInstance().exceptionCaught(e);
405
406 try {
407 Thread.sleep(1000);
408 } catch (InterruptedException e1) {
409 ExceptionMonitor.getInstance().exceptionCaught(e1);
410 }
411 }
412 }
413
414 if (selectable && isDisposing()) {
415 selectable = false;
416 try {
417 if (createdProcessor) {
418 processor.dispose();
419 }
420 } finally {
421 try {
422 synchronized (disposalLock) {
423 if (isDisposing()) {
424 destroy();
425 }
426 }
427 } catch (Exception e) {
428 ExceptionMonitor.getInstance().exceptionCaught(e);
429 } finally {
430 disposalFuture.setDone();
431 }
432 }
433 }
434 }
435
436
437
438
439
440
441
442
443
444
445 @SuppressWarnings("unchecked")
446 private void processHandles(Iterator<H> handles) throws Exception {
447 while (handles.hasNext()) {
448 H handle = handles.next();
449 handles.remove();
450
451 T session = accept(processor, handle);
452 if (session == null) {
453 break;
454 }
455
456 finishSessionInitialization(session, null, null);
457
458
459 session.getProcessor().add(session);
460 }
461 }
462 }
463
464
465
466
467
468
469
470
471
472
473 private int registerHandles() {
474 for (;;) {
475 AcceptorOperationFuture future = registerQueue.poll();
476 if (future == null) {
477 return 0;
478 }
479
480 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
481 List<SocketAddress> localAddresses = future.getLocalAddresses();
482
483 try {
484 for (SocketAddress a : localAddresses) {
485 H handle = open(a);
486 newHandles.put(localAddress(handle), handle);
487 }
488
489 boundHandles.putAll(newHandles);
490
491
492 future.setDone();
493 return newHandles.size();
494 } catch (Exception e) {
495 future.setException(e);
496 } finally {
497
498 if (future.getException() != null) {
499 for (H handle : newHandles.values()) {
500 try {
501 close(handle);
502 } catch (Exception e) {
503 ExceptionMonitor.getInstance().exceptionCaught(e);
504 }
505 }
506 wakeup();
507 }
508 }
509 }
510 }
511
512
513
514
515
516
517
518 private int unregisterHandles() {
519 int cancelledHandles = 0;
520 for (;;) {
521 AcceptorOperationFuture future = cancelQueue.poll();
522 if (future == null) {
523 break;
524 }
525
526
527 for (SocketAddress a : future.getLocalAddresses()) {
528 H handle = boundHandles.remove(a);
529 if (handle == null) {
530 continue;
531 }
532
533 try {
534 close(handle);
535 wakeup();
536 } catch (Throwable e) {
537 ExceptionMonitor.getInstance().exceptionCaught(e);
538 } finally {
539 cancelledHandles++;
540 }
541 }
542
543 future.setDone();
544 }
545
546 return cancelledHandles;
547 }
548
549
550
551
552 public final IoSession newSession(SocketAddress remoteAddress,
553 SocketAddress localAddress) {
554 throw new UnsupportedOperationException();
555 }
556 }