1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.buffer.IoBuffer;
34 import org.apache.mina.core.file.FileRegion;
35 import org.apache.mina.core.filterchain.IoFilterChain;
36 import org.apache.mina.core.future.DefaultIoFuture;
37 import org.apache.mina.core.service.AbstractIoService;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.session.AbstractIoSession;
40 import org.apache.mina.core.session.IoSession;
41 import org.apache.mina.core.session.IoSessionConfig;
42 import org.apache.mina.core.write.WriteRequest;
43 import org.apache.mina.core.write.WriteRequestQueue;
44 import org.apache.mina.core.write.WriteToClosedSessionException;
45 import org.apache.mina.util.ExceptionMonitor;
46 import org.apache.mina.util.NamePreservingRunnable;
47
48
49
50
51
52
53
54
55
56 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
57
58
59
60
61
62
63 private static final int WRITE_SPIN_COUNT = 256;
64
65
66 private static final long SELECT_TIMEOUT = 1000L;
67
68
69 private static final Map<Class<?>, AtomicInteger> threadIds =
70 new HashMap<Class<?>, AtomicInteger>();
71
72 private final Object lock = new Object();
73 private final String threadName;
74 private final Executor executor;
75
76
77 private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
78 private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
79 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
80 private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
81
82
83 private Processor processor;
84
85 private long lastIdleCheckTime;
86
87 private final Object disposalLock = new Object();
88 private volatile boolean disposing;
89 private volatile boolean disposed;
90 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
91
92
93
94
95
96
97
98 protected AbstractPollingIoProcessor(Executor executor) {
99 if (executor == null) {
100 throw new NullPointerException("executor");
101 }
102
103 this.threadName = nextThreadName();
104 this.executor = executor;
105 }
106
107
108
109
110
111
112
113
114
115 private String nextThreadName() {
116 Class<?> cls = getClass();
117 int newThreadId;
118
119
120
121
122 synchronized( threadIds ) {
123
124 AtomicInteger threadId = threadIds.get(cls);
125
126 if (threadId == null) {
127
128
129
130 newThreadId = 1;
131 threadIds.put(cls, new AtomicInteger(newThreadId));
132 } else {
133
134 newThreadId = threadId.incrementAndGet();
135 }
136 }
137
138
139 return cls.getSimpleName() + '-' + newThreadId;
140 }
141
142
143
144
145 public final boolean isDisposing() {
146 return disposing;
147 }
148
149
150
151
152 public final boolean isDisposed() {
153 return disposed;
154 }
155
156
157
158
159 public final void dispose() {
160 if (disposed) {
161 return;
162 }
163
164 synchronized (disposalLock) {
165 if (!disposing) {
166 disposing = true;
167 startupProcessor();
168 }
169 }
170
171 disposalFuture.awaitUninterruptibly();
172 disposed = true;
173 }
174
175
176
177
178
179
180 protected abstract void dispose0() throws Exception;
181
182
183
184
185
186
187
188 protected abstract int select(long timeout) throws Exception;
189
190
191
192
193
194
195 protected abstract int select() throws Exception;
196
197
198
199
200
201
202 protected abstract boolean isSelectorEmpty();
203
204
205
206
207 protected abstract void wakeup();
208
209
210
211
212
213
214 protected abstract Iterator<T> allSessions();
215
216
217
218
219
220
221 protected abstract Iterator<T> selectedSessions();
222
223
224
225
226
227
228 protected abstract SessionState state(T session);
229
230
231
232
233
234
235 protected abstract boolean isWritable(T session);
236
237
238
239
240
241
242 protected abstract boolean isReadable(T session);
243
244
245
246
247
248
249 protected abstract void setInterestedInWrite(T session, boolean interested)
250 throws Exception;
251
252
253
254
255
256
257 protected abstract void setInterestedInRead(T session, boolean interested)
258 throws Exception;
259
260
261
262
263
264
265 protected abstract boolean isInterestedInRead(T session);
266
267
268
269
270
271
272 protected abstract boolean isInterestedInWrite(T session);
273
274
275
276
277
278
279 protected abstract void init(T session) throws Exception;
280
281
282
283
284
285
286 protected abstract void destroy(T session) throws Exception;
287
288
289
290
291
292
293
294
295
296 protected abstract int read(T session, IoBuffer buf) throws Exception;
297
298
299
300
301
302
303
304
305
306
307
308 protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
309
310
311
312
313
314
315
316
317
318
319
320 protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
321
322
323
324
325 public final void add(T session) {
326 if (isDisposing()) {
327 throw new IllegalStateException("Already disposed.");
328 }
329
330
331 newSessions.add(session);
332 startupProcessor();
333 }
334
335
336
337
338 public final void remove(T session) {
339 scheduleRemove(session);
340 startupProcessor();
341 }
342
343 private void scheduleRemove(T session) {
344 removingSessions.add(session);
345 }
346
347
348
349
350 public final void flush(T session) {
351 boolean needsWakeup = flushingSessions.isEmpty();
352 if (scheduleFlush(session) && needsWakeup) {
353 wakeup();
354 }
355 }
356
357 private boolean scheduleFlush(T session) {
358 if (session.setScheduledForFlush(true)) {
359 flushingSessions.add(session);
360 return true;
361 }
362 return false;
363 }
364
365
366
367
368 public final void updateTrafficMask(T session) {
369 scheduleTrafficControl(session);
370 wakeup();
371 }
372
373 private void scheduleTrafficControl(T session) {
374 trafficControllingSessions.add(session);
375 }
376
377
378
379
380
381 private void startupProcessor() {
382 synchronized (lock) {
383 if (processor == null) {
384 processor = new Processor();
385 executor.execute(new NamePreservingRunnable(processor, threadName));
386 }
387 }
388
389
390
391 wakeup();
392 }
393
394
395
396
397
398 private int handleNewSessions() {
399 int addedSessions = 0;
400
401
402
403 for (;;) {
404 T session = newSessions.poll();
405
406 if (session == null) {
407
408 break;
409 }
410
411
412 if (addNow(session)) {
413
414 addedSessions ++;
415 }
416 }
417
418 return addedSessions;
419 }
420
421 private boolean addNow(T session) {
422
423 boolean registered = false;
424 boolean notified = false;
425 try {
426 init(session);
427 registered = true;
428
429
430 session.getService().getFilterChainBuilder().buildFilterChain(
431 session.getFilterChain());
432
433
434
435 ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
436 notified = true;
437 } catch (Throwable e) {
438 if (notified) {
439
440
441 scheduleRemove(session);
442 IoFilterChain filterChain = session.getFilterChain();
443 filterChain.fireExceptionCaught(e);
444 wakeup();
445 } else {
446 ExceptionMonitor.getInstance().exceptionCaught(e);
447 try {
448 destroy(session);
449 } catch (Exception e1) {
450 ExceptionMonitor.getInstance().exceptionCaught(e1);
451 } finally {
452 registered = false;
453 }
454 }
455 }
456 return registered;
457 }
458
459 private int remove() {
460 int removedSessions = 0;
461 for (; ;) {
462 T session = removingSessions.poll();
463
464 if (session == null) {
465 break;
466 }
467
468 SessionState state = state(session);
469 switch (state) {
470 case OPEN:
471 if (removeNow(session)) {
472 removedSessions ++;
473 }
474 break;
475 case CLOSED:
476
477 break;
478 case PREPARING:
479
480
481 scheduleRemove(session);
482 return removedSessions;
483 default:
484 throw new IllegalStateException(String.valueOf(state));
485 }
486 }
487
488 return removedSessions;
489 }
490
491 private boolean removeNow(T session) {
492 clearWriteRequestQueue(session);
493
494 try {
495 destroy(session);
496 return true;
497 } catch (Exception e) {
498 IoFilterChain filterChain = session.getFilterChain();
499 filterChain.fireExceptionCaught(e);
500 } finally {
501 clearWriteRequestQueue(session);
502 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
503 }
504 return false;
505 }
506
507 private void clearWriteRequestQueue(T session) {
508 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
509 WriteRequest req;
510
511 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
512
513 if ((req = writeRequestQueue.poll(session)) != null) {
514 Object m = req.getMessage();
515 if (m instanceof IoBuffer) {
516 IoBuffer buf = (IoBuffer) req.getMessage();
517
518
519
520 if (buf.hasRemaining()) {
521 buf.reset();
522 failedRequests.add(req);
523 } else {
524 IoFilterChain filterChain = session.getFilterChain();
525 filterChain.fireMessageSent(req);
526 }
527 } else {
528 failedRequests.add(req);
529 }
530
531
532 while ((req = writeRequestQueue.poll(session)) != null) {
533 failedRequests.add(req);
534 }
535 }
536
537
538 if (!failedRequests.isEmpty()) {
539 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
540 for (WriteRequest r: failedRequests) {
541 session.decreaseScheduledBytesAndMessages(r);
542 r.getFuture().setException(cause);
543 }
544 IoFilterChain filterChain = session.getFilterChain();
545 filterChain.fireExceptionCaught(cause);
546 }
547 }
548
549 private void process() throws Exception {
550 for (Iterator<T> i = selectedSessions(); i.hasNext();) {
551 T session = i.next();
552 process(session);
553 i.remove();
554 }
555 }
556
557
558
559
560 private void process(T session) {
561
562 if (isReadable(session) && !session.isReadSuspended()) {
563 read(session);
564 }
565
566
567 if (isWritable(session) && !session.isWriteSuspended()) {
568 scheduleFlush(session);
569 }
570 }
571
572 private void read(T session) {
573 IoSessionConfig config = session.getConfig();
574 IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
575
576 final boolean hasFragmentation =
577 session.getTransportMetadata().hasFragmentation();
578
579 try {
580 int readBytes = 0;
581 int ret;
582
583 try {
584 if (hasFragmentation) {
585 while ((ret = read(session, buf)) > 0) {
586 readBytes += ret;
587 if (!buf.hasRemaining()) {
588 break;
589 }
590 }
591 } else {
592 ret = read(session, buf);
593 if (ret > 0) {
594 readBytes = ret;
595 }
596 }
597 } finally {
598 buf.flip();
599 }
600
601 if (readBytes > 0) {
602 IoFilterChain filterChain = session.getFilterChain();
603 filterChain.fireMessageReceived(buf);
604 buf = null;
605
606 if (hasFragmentation) {
607 if (readBytes << 1 < config.getReadBufferSize()) {
608 session.decreaseReadBufferSize();
609 } else if (readBytes == config.getReadBufferSize()) {
610 session.increaseReadBufferSize();
611 }
612 }
613 }
614 if (ret < 0) {
615 scheduleRemove(session);
616 }
617 } catch (Throwable e) {
618 if (e instanceof IOException) {
619 scheduleRemove(session);
620 }
621 IoFilterChain filterChain = session.getFilterChain();
622 filterChain.fireExceptionCaught(e);
623 }
624 }
625
626 private void notifyIdleSessions(long currentTime) throws Exception {
627
628 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
629 lastIdleCheckTime = currentTime;
630 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
631 }
632 }
633
634 private void flush(long currentTime) {
635 final T firstSession = flushingSessions.peek();
636 if (firstSession == null) {
637 return;
638 }
639
640 T session = flushingSessions.poll();
641 for (; ;) {
642 session.setScheduledForFlush(false);
643 SessionState state = state(session);
644
645 switch (state) {
646 case OPEN:
647 try {
648 boolean flushedAll = flushNow(session, currentTime);
649 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
650 !session.isScheduledForFlush()) {
651 scheduleFlush(session);
652 }
653 } catch (Exception e) {
654 scheduleRemove(session);
655 IoFilterChain filterChain = session.getFilterChain();
656 filterChain.fireExceptionCaught(e);
657 }
658 break;
659 case CLOSED:
660
661 break;
662 case PREPARING:
663
664
665 scheduleFlush(session);
666 return;
667 default:
668 throw new IllegalStateException(String.valueOf(state));
669 }
670
671 session = flushingSessions.peek();
672 if (session == null || session == firstSession) {
673 break;
674 }
675 session = flushingSessions.poll();
676 }
677 }
678
679 private boolean flushNow(T session, long currentTime) {
680 if (!session.isConnected()) {
681 scheduleRemove(session);
682 return false;
683 }
684
685 final boolean hasFragmentation =
686 session.getTransportMetadata().hasFragmentation();
687
688 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
689
690
691
692
693 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
694 (session.getConfig().getMaxReadBufferSize() >>> 1);
695 int writtenBytes = 0;
696 try {
697
698 setInterestedInWrite(session, false);
699 do {
700
701 WriteRequest req = session.getCurrentWriteRequest();
702 if (req == null) {
703 req = writeRequestQueue.poll(session);
704 if (req == null) {
705 break;
706 }
707 session.setCurrentWriteRequest(req);
708 }
709
710 int localWrittenBytes = 0;
711 Object message = req.getMessage();
712 if (message instanceof IoBuffer) {
713 localWrittenBytes = writeBuffer(
714 session, req, hasFragmentation,
715 maxWrittenBytes - writtenBytes,
716 currentTime);
717 if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) {
718
719 writtenBytes += localWrittenBytes;
720 setInterestedInWrite(session, true);
721 return false;
722 }
723 } else if (message instanceof FileRegion) {
724 localWrittenBytes = writeFile(
725 session, req, hasFragmentation,
726 maxWrittenBytes - writtenBytes,
727 currentTime);
728
729
730
731
732 if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
733 writtenBytes += localWrittenBytes;
734 setInterestedInWrite(session, true);
735 return false;
736 }
737 } else {
738 throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?");
739 }
740
741 if (localWrittenBytes == 0) {
742
743 setInterestedInWrite(session, true);
744 return false;
745 }
746
747 writtenBytes += localWrittenBytes;
748
749 if (writtenBytes >= maxWrittenBytes) {
750
751 scheduleFlush(session);
752 return false;
753 }
754 } while (writtenBytes < maxWrittenBytes);
755 } catch (Exception e) {
756 IoFilterChain filterChain = session.getFilterChain();
757 filterChain.fireExceptionCaught(e);
758 return false;
759 }
760
761 return true;
762 }
763
764 private int writeBuffer(T session, WriteRequest req,
765 boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
766 IoBuffer buf = (IoBuffer) req.getMessage();
767 int localWrittenBytes = 0;
768 if (buf.hasRemaining()) {
769 int length;
770 if (hasFragmentation) {
771 length = Math.min(buf.remaining(), maxLength);
772 } else {
773 length = buf.remaining();
774 }
775 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
776 localWrittenBytes = write(session, buf, length);
777 if (localWrittenBytes != 0) {
778 break;
779 }
780 }
781 }
782
783 session.increaseWrittenBytes(localWrittenBytes, currentTime);
784
785 if (!buf.hasRemaining() ||
786 !hasFragmentation && localWrittenBytes != 0) {
787
788 buf.reset();
789 fireMessageSent(session, req);
790 }
791 return localWrittenBytes;
792 }
793
794 private int writeFile(T session, WriteRequest req,
795 boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
796 int localWrittenBytes;
797 FileRegion region = (FileRegion) req.getMessage();
798 if (region.getRemainingBytes() > 0) {
799 int length;
800 if (hasFragmentation) {
801 length = (int) Math.min(region.getRemainingBytes(), maxLength);
802 } else {
803 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
804 }
805 localWrittenBytes = transferFile(session, region, length);
806 region.update(localWrittenBytes);
807 } else {
808 localWrittenBytes = 0;
809 }
810
811 session.increaseWrittenBytes(localWrittenBytes, currentTime);
812
813 if (region.getRemainingBytes() <= 0 ||
814 !hasFragmentation && localWrittenBytes != 0) {
815 fireMessageSent(session, req);
816 }
817
818 return localWrittenBytes;
819 }
820
821 private void fireMessageSent(T session, WriteRequest req) {
822 session.setCurrentWriteRequest(null);
823 IoFilterChain filterChain = session.getFilterChain();
824 filterChain.fireMessageSent(req);
825 }
826
827 private void updateTrafficMask() {
828 for (; ;) {
829 T session = trafficControllingSessions.poll();
830
831 if (session == null) {
832 break;
833 }
834
835 SessionState state = state(session);
836 switch (state) {
837 case OPEN:
838 updateTrafficControl(session);
839 break;
840 case CLOSED:
841 break;
842 case PREPARING:
843
844
845
846 scheduleTrafficControl(session);
847 return;
848 default:
849 throw new IllegalStateException(String.valueOf(state));
850 }
851 }
852 }
853
854 public void updateTrafficControl(T session) {
855 try {
856 setInterestedInRead(session, !session.isReadSuspended());
857 } catch (Exception e) {
858 IoFilterChain filterChain = session.getFilterChain();
859 filterChain.fireExceptionCaught(e);
860 }
861 try {
862 setInterestedInWrite(
863 session,
864 !session.getWriteRequestQueue().isEmpty(session) &&
865 !session.isWriteSuspended());
866 } catch (Exception e) {
867 IoFilterChain filterChain = session.getFilterChain();
868 filterChain.fireExceptionCaught(e);
869 }
870 }
871
872 private class Processor implements Runnable {
873 public void run() {
874 int nSessions = 0;
875 lastIdleCheckTime = System.currentTimeMillis();
876
877 for (;;) {
878 try {
879
880
881
882
883 int selected = select(SELECT_TIMEOUT);
884
885 nSessions += handleNewSessions();
886 updateTrafficMask();
887
888
889
890 if (selected > 0) {
891 process();
892 }
893
894 long currentTime = System.currentTimeMillis();
895 flush(currentTime);
896 nSessions -= remove();
897 notifyIdleSessions(currentTime);
898
899 if (nSessions == 0) {
900 synchronized (lock) {
901 if (newSessions.isEmpty() && isSelectorEmpty()) {
902 processor = null;
903 break;
904 }
905 }
906 }
907
908
909
910 if (isDisposing()) {
911 for (Iterator<T> i = allSessions(); i.hasNext(); ) {
912 scheduleRemove(i.next());
913 }
914 wakeup();
915 }
916 } catch (Throwable t) {
917 ExceptionMonitor.getInstance().exceptionCaught(t);
918
919 try {
920 Thread.sleep(1000);
921 } catch (InterruptedException e1) {
922 ExceptionMonitor.getInstance().exceptionCaught(e1);
923 }
924 }
925 }
926
927 try {
928 synchronized (disposalLock) {
929 if (isDisposing()) {
930 dispose0();
931 }
932 }
933 } catch (Throwable t) {
934 ExceptionMonitor.getInstance().exceptionCaught(t);
935 } finally {
936 disposalFuture.setValue(true);
937 }
938 }
939 }
940
941 protected static enum SessionState {
942 OPEN,
943 CLOSED,
944 PREPARING,
945 }
946 }