1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.apache.mina.core.buffer.IoBuffer;
35 import org.apache.mina.core.file.FileRegion;
36 import org.apache.mina.core.future.DefaultIoFuture;
37 import org.apache.mina.core.service.AbstractIoService;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.session.AbstractIoSession;
40 import org.apache.mina.core.session.IdleStatusChecker;
41 import org.apache.mina.core.session.IoSession;
42 import org.apache.mina.core.session.IoSessionConfig;
43 import org.apache.mina.core.write.WriteRequest;
44 import org.apache.mina.core.write.WriteRequestQueue;
45 import org.apache.mina.core.write.WriteToClosedSessionException;
46 import org.apache.mina.util.ExceptionMonitor;
47 import org.apache.mina.util.NamePreservingRunnable;
48
49
50
51
52
53
54
55
56
57
58 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
59
60
61
62
63
64
65 private static final int WRITE_SPIN_COUNT = 256;
66
67
68 private static final Map<Class<?>, AtomicInteger> threadIds =
69 new HashMap<Class<?>, AtomicInteger>();
70
71 private final Object lock = new Object();
72 private final String threadName;
73 private final Executor executor;
74
75 private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
76 private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
77 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
78 private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
79
80 private Worker worker;
81 private long lastIdleCheckTime;
82
83 private final Object disposalLock = new Object();
84 private volatile boolean disposing;
85 private volatile boolean disposed;
86 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
87
88
89
90
91
92
93
94 protected AbstractPollingIoProcessor(Executor executor) {
95 if (executor == null) {
96 throw new NullPointerException("executor");
97 }
98
99 this.threadName = nextThreadName();
100 this.executor = executor;
101 }
102
103
104
105
106
107
108
109
110
111 private String nextThreadName() {
112 Class<?> cls = getClass();
113 int newThreadId;
114
115
116
117
118 synchronized( threadIds ) {
119
120 AtomicInteger threadId = threadIds.get(cls);
121
122 if (threadId == null) {
123
124
125
126 newThreadId = 1;
127 threadIds.put(cls, new AtomicInteger(newThreadId));
128 } else {
129
130 newThreadId = threadId.incrementAndGet();
131 }
132 }
133
134
135 return cls.getSimpleName() + '-' + newThreadId;
136 }
137
138
139
140
141 public final boolean isDisposing() {
142 return disposing;
143 }
144
145
146
147
148 public final boolean isDisposed() {
149 return disposed;
150 }
151
152
153
154
155 public final void dispose() {
156 if (disposed) {
157 return;
158 }
159
160 synchronized (disposalLock) {
161 if (!disposing) {
162 disposing = true;
163 startupWorker();
164 }
165 }
166
167 disposalFuture.awaitUninterruptibly();
168 disposed = true;
169 }
170
171
172
173
174
175
176 protected abstract void dispose0() throws Exception;
177
178
179
180
181
182
183
184 protected abstract boolean select(int timeout) throws Exception;
185
186
187
188
189
190
191 protected abstract boolean isSelectorEmpty();
192
193
194
195
196 protected abstract void wakeup();
197
198
199
200
201
202
203 protected abstract Iterator<T> allSessions();
204
205
206
207
208
209
210 protected abstract Iterator<T> selectedSessions();
211
212
213
214
215
216
217 protected abstract SessionState state(T session);
218
219
220
221
222
223
224 protected abstract boolean isWritable(T session);
225
226
227
228
229
230
231 protected abstract boolean isReadable(T session);
232
233
234
235
236
237
238 protected abstract void setInterestedInWrite(T session, boolean interested)
239 throws Exception;
240
241
242
243
244
245
246 protected abstract void setInterestedInRead(T session, boolean interested)
247 throws Exception;
248
249
250
251
252
253
254 protected abstract boolean isInterestedInRead(T session);
255
256
257
258
259
260
261 protected abstract boolean isInterestedInWrite(T session);
262
263
264
265
266
267
268 protected abstract void init(T session) throws Exception;
269
270
271
272
273
274
275 protected abstract void destroy(T session) throws Exception;
276
277
278
279
280
281
282
283
284
285 protected abstract int read(T session, IoBuffer buf) throws Exception;
286
287
288
289
290
291
292
293
294
295
296
297 protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
298
299
300
301
302
303
304
305
306
307
308
309 protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
310
311
312
313
314 public final void add(T session) {
315 if (isDisposing()) {
316 throw new IllegalStateException("Already disposed.");
317 }
318
319 newSessions.add(session);
320 startupWorker();
321 }
322
323
324
325
326 public final void remove(T session) {
327 scheduleRemove(session);
328 startupWorker();
329 }
330
331 private void scheduleRemove(T session) {
332 removingSessions.add(session);
333 }
334
335
336
337
338 public final void flush(T session) {
339
340
341
342
343
344
345
346
347 boolean needsWakeup = flushingSessions.isEmpty();
348 if (scheduleFlush(session) && needsWakeup) {
349 wakeup();
350 }
351 }
352
353 private boolean scheduleFlush(T session) {
354 if (session.setScheduledForFlush(true)) {
355 flushingSessions.add(session);
356 return true;
357 }
358 return false;
359 }
360
361
362
363
364 public final void updateTrafficMask(T session) {
365 scheduleTrafficControl(session);
366 wakeup();
367 }
368
369 private void scheduleTrafficControl(T session) {
370 trafficControllingSessions.add(session);
371 }
372
373 private void startupWorker() {
374 synchronized (lock) {
375 if (worker == null) {
376 worker = new Worker();
377 executor.execute(new NamePreservingRunnable(worker, threadName));
378 }
379 }
380 wakeup();
381 }
382
383 private int add() {
384 int addedSessions = 0;
385
386
387
388 for (;;) {
389 T session = newSessions.poll();
390
391 if (session == null) {
392
393 break;
394 }
395
396
397 if (addNow(session)) {
398
399 addedSessions ++;
400 }
401 }
402
403 return addedSessions;
404 }
405
406 private boolean addNow(T session) {
407
408 boolean registered = false;
409 boolean notified = false;
410 try {
411 init(session);
412 registered = true;
413
414
415 session.getService().getFilterChainBuilder().buildFilterChain(
416 session.getFilterChain());
417
418
419
420 ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
421 notified = true;
422 } catch (Throwable e) {
423 if (notified) {
424
425
426 scheduleRemove(session);
427 session.getFilterChain().fireExceptionCaught(e);
428 wakeup();
429 } else {
430 ExceptionMonitor.getInstance().exceptionCaught(e);
431 try {
432 destroy(session);
433 } catch (Exception e1) {
434 ExceptionMonitor.getInstance().exceptionCaught(e1);
435 } finally {
436 registered = false;
437 }
438 }
439 }
440 return registered;
441 }
442
443 private int remove() {
444 int removedSessions = 0;
445 for (; ;) {
446 T session = removingSessions.poll();
447
448 if (session == null) {
449 break;
450 }
451
452 SessionState state = state(session);
453 switch (state) {
454 case OPEN:
455 if (removeNow(session)) {
456 removedSessions ++;
457 }
458 break;
459 case CLOSED:
460
461 break;
462 case PREPARING:
463
464
465 scheduleRemove(session);
466 return removedSessions;
467 default:
468 throw new IllegalStateException(String.valueOf(state));
469 }
470 }
471
472 return removedSessions;
473 }
474
475 private boolean removeNow(T session) {
476 clearWriteRequestQueue(session);
477
478 try {
479 destroy(session);
480 return true;
481 } catch (Exception e) {
482 session.getFilterChain().fireExceptionCaught(e);
483 } finally {
484 clearWriteRequestQueue(session);
485 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
486 }
487 return false;
488 }
489
490 private void clearWriteRequestQueue(T session) {
491 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
492 WriteRequest req;
493
494 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
495
496 if ((req = writeRequestQueue.poll(session)) != null) {
497 Object m = req.getMessage();
498 if (m instanceof IoBuffer) {
499 IoBuffer buf = (IoBuffer) req.getMessage();
500
501
502
503 if (buf.hasRemaining()) {
504 buf.reset();
505 failedRequests.add(req);
506 } else {
507 session.getFilterChain().fireMessageSent(req);
508 }
509 } else {
510 failedRequests.add(req);
511 }
512
513
514 while ((req = writeRequestQueue.poll(session)) != null) {
515 failedRequests.add(req);
516 }
517 }
518
519
520 if (!failedRequests.isEmpty()) {
521 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
522 for (WriteRequest r: failedRequests) {
523 session.decreaseScheduledBytesAndMessages(r);
524 r.getFuture().setException(cause);
525 }
526 session.getFilterChain().fireExceptionCaught(cause);
527 }
528 }
529
530 private void process() throws Exception {
531 for (Iterator<T> i = selectedSessions(); i.hasNext();) {
532 process(i.next());
533 i.remove();
534 }
535 }
536
537 private void process(T session) {
538
539 if (isReadable(session) && session.getTrafficMask().isReadable()) {
540 read(session);
541 }
542
543 if (isWritable(session) && session.getTrafficMask().isWritable()) {
544 scheduleFlush(session);
545 }
546 }
547
548 private void read(T session) {
549 IoSessionConfig config = session.getConfig();
550 IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
551
552 final boolean hasFragmentation =
553 session.getTransportMetadata().hasFragmentation();
554
555 try {
556 int readBytes = 0;
557 int ret;
558
559 try {
560 if (hasFragmentation) {
561 while ((ret = read(session, buf)) > 0) {
562 readBytes += ret;
563 if (!buf.hasRemaining()) {
564 break;
565 }
566 }
567 } else {
568 ret = read(session, buf);
569 if (ret > 0) {
570 readBytes = ret;
571 }
572 }
573 } finally {
574 buf.flip();
575 }
576
577 if (readBytes > 0) {
578 session.getFilterChain().fireMessageReceived(buf);
579 buf = null;
580
581 if (hasFragmentation) {
582 if (readBytes << 1 < config.getReadBufferSize()) {
583 session.decreaseReadBufferSize();
584 } else if (readBytes == config.getReadBufferSize()) {
585 session.increaseReadBufferSize();
586 }
587 }
588 }
589 if (ret < 0) {
590 scheduleRemove(session);
591 }
592 } catch (Throwable e) {
593 if (e instanceof IOException) {
594 scheduleRemove(session);
595 }
596 session.getFilterChain().fireExceptionCaught(e);
597 }
598 }
599
600 private void notifyIdleSessions(long currentTime) throws Exception {
601
602 if (currentTime - lastIdleCheckTime >= 1000) {
603 lastIdleCheckTime = currentTime;
604 IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
605 }
606 }
607
608 private void flush(long currentTime) {
609 final T firstSession = flushingSessions.peek();
610 if (firstSession == null) {
611 return;
612 }
613
614 T session = flushingSessions.poll();
615 for (; ;) {
616 session.setScheduledForFlush(false);
617 SessionState state = state(session);
618 switch (state) {
619 case OPEN:
620 try {
621 boolean flushedAll = flushNow(session, currentTime);
622 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
623 !session.isScheduledForFlush()) {
624 scheduleFlush(session);
625 }
626 } catch (Exception e) {
627 scheduleRemove(session);
628 session.getFilterChain().fireExceptionCaught(e);
629 }
630 break;
631 case CLOSED:
632
633 break;
634 case PREPARING:
635
636
637 scheduleFlush(session);
638 return;
639 default:
640 throw new IllegalStateException(String.valueOf(state));
641 }
642
643 session = flushingSessions.peek();
644 if (session == null || session == firstSession) {
645 break;
646 }
647 session = flushingSessions.poll();
648 }
649 }
650
651 private boolean flushNow(T session, long currentTime) {
652 if (!session.isConnected()) {
653 scheduleRemove(session);
654 return false;
655 }
656
657 final boolean hasFragmentation =
658 session.getTransportMetadata().hasFragmentation();
659
660 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
661
662
663
664
665 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
666 (session.getConfig().getMaxReadBufferSize() >>> 1);
667 int writtenBytes = 0;
668 try {
669
670 setInterestedInWrite(session, false);
671 do {
672
673 WriteRequest req = session.getCurrentWriteRequest();
674 if (req == null) {
675 req = writeRequestQueue.poll(session);
676 if (req == null) {
677 break;
678 }
679 session.setCurrentWriteRequest(req);
680 }
681
682 int localWrittenBytes = 0;
683 Object message = req.getMessage();
684 if (message instanceof IoBuffer) {
685 localWrittenBytes = writeBuffer(
686 session, req, hasFragmentation,
687 maxWrittenBytes - writtenBytes,
688 currentTime);
689 if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) {
690
691 writtenBytes += localWrittenBytes;
692 setInterestedInWrite(session, true);
693 return false;
694 }
695 } else if (message instanceof FileRegion) {
696 localWrittenBytes = writeFile(
697 session, req, hasFragmentation,
698 maxWrittenBytes - writtenBytes,
699 currentTime);
700
701
702
703
704 if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
705 writtenBytes += localWrittenBytes;
706 setInterestedInWrite(session, true);
707 return false;
708 }
709 } else {
710 throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?");
711 }
712
713 if (localWrittenBytes == 0) {
714
715 setInterestedInWrite(session, true);
716 return false;
717 }
718
719 writtenBytes += localWrittenBytes;
720
721 if (writtenBytes >= maxWrittenBytes) {
722
723 scheduleFlush(session);
724 return false;
725 }
726 } while (writtenBytes < maxWrittenBytes);
727 } catch (Exception e) {
728 session.getFilterChain().fireExceptionCaught(e);
729 return false;
730 }
731
732 return true;
733 }
734
735 private int writeBuffer(T session, WriteRequest req,
736 boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
737 IoBuffer buf = (IoBuffer) req.getMessage();
738 int localWrittenBytes = 0;
739 if (buf.hasRemaining()) {
740 int length;
741 if (hasFragmentation) {
742 length = Math.min(buf.remaining(), maxLength);
743 } else {
744 length = buf.remaining();
745 }
746 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
747 localWrittenBytes = write(session, buf, length);
748 if (localWrittenBytes != 0) {
749 break;
750 }
751 }
752 }
753
754 session.increaseWrittenBytes(localWrittenBytes, currentTime);
755
756 if (!buf.hasRemaining() ||
757 !hasFragmentation && localWrittenBytes != 0) {
758
759 buf.reset();
760 fireMessageSent(session, req);
761 }
762 return localWrittenBytes;
763 }
764
765 private int writeFile(T session, WriteRequest req,
766 boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
767 int localWrittenBytes;
768 FileRegion region = (FileRegion) req.getMessage();
769 if (region.getRemainingBytes() > 0) {
770 int length;
771 if (hasFragmentation) {
772 length = (int) Math.min(region.getRemainingBytes(), maxLength);
773 } else {
774 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
775 }
776 localWrittenBytes = transferFile(session, region, length);
777 region.update(localWrittenBytes);
778 } else {
779 localWrittenBytes = 0;
780 }
781
782 session.increaseWrittenBytes(localWrittenBytes, currentTime);
783
784 if (region.getRemainingBytes() <= 0 ||
785 !hasFragmentation && localWrittenBytes != 0) {
786 fireMessageSent(session, req);
787 }
788
789 return localWrittenBytes;
790 }
791
792 private void fireMessageSent(T session, WriteRequest req) {
793 session.setCurrentWriteRequest(null);
794 session.getFilterChain().fireMessageSent(req);
795 }
796
797 private void updateTrafficMask() {
798 for (; ;) {
799 T session = trafficControllingSessions.poll();
800
801 if (session == null) {
802 break;
803 }
804
805 SessionState state = state(session);
806 switch (state) {
807 case OPEN:
808 updateTrafficMaskNow(session);
809 break;
810 case CLOSED:
811 break;
812 case PREPARING:
813
814
815
816 scheduleTrafficControl(session);
817 return;
818 default:
819 throw new IllegalStateException(String.valueOf(state));
820 }
821 }
822 }
823
824 private void updateTrafficMaskNow(T session) {
825
826
827 int mask = session.getTrafficMask().getInterestOps();
828 try {
829 setInterestedInRead(session, (mask & SelectionKey.OP_READ) != 0);
830 } catch (Exception e) {
831 session.getFilterChain().fireExceptionCaught(e);
832 }
833 try {
834 setInterestedInWrite(
835 session,
836 !session.getWriteRequestQueue().isEmpty(session) &&
837 (mask & SelectionKey.OP_WRITE) != 0);
838 } catch (Exception e) {
839 session.getFilterChain().fireExceptionCaught(e);
840 }
841 }
842
843
844 private class Worker implements Runnable {
845 public void run() {
846 int nSessions = 0;
847 lastIdleCheckTime = System.currentTimeMillis();
848
849 for (;;) {
850 try {
851 boolean selected = select(1000);
852
853 nSessions += add();
854 updateTrafficMask();
855
856 if (selected) {
857 process();
858 }
859
860 long currentTime = System.currentTimeMillis();
861 flush(currentTime);
862 nSessions -= remove();
863 notifyIdleSessions(currentTime);
864
865 if (nSessions == 0) {
866 synchronized (lock) {
867 if (newSessions.isEmpty() && isSelectorEmpty()) {
868 worker = null;
869 break;
870 }
871 }
872 }
873
874
875
876 if (isDisposing()) {
877 for (Iterator<T> i = allSessions(); i.hasNext(); ) {
878 scheduleRemove(i.next());
879 }
880 wakeup();
881 }
882 } catch (Throwable t) {
883 ExceptionMonitor.getInstance().exceptionCaught(t);
884
885 try {
886 Thread.sleep(1000);
887 } catch (InterruptedException e1) {
888 ExceptionMonitor.getInstance().exceptionCaught(e1);
889 }
890 }
891 }
892
893 try {
894 synchronized (disposalLock) {
895 if (isDisposing()) {
896 dispose0();
897 }
898 }
899 } catch (Throwable t) {
900 ExceptionMonitor.getInstance().exceptionCaught(t);
901 } finally {
902 disposalFuture.setValue(true);
903 }
904 }
905 }
906
907 protected static enum SessionState {
908 OPEN,
909 CLOSED,
910 PREPARING,
911 }
912 }