1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33
34 import org.apache.mina.core.RuntimeIoException;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.future.IoFuture;
37 import org.apache.mina.core.service.AbstractIoAcceptor;
38 import org.apache.mina.core.service.IoAcceptor;
39 import org.apache.mina.core.service.IoProcessor;
40 import org.apache.mina.core.session.AbstractIoSession;
41 import org.apache.mina.core.session.ExpiringSessionRecycler;
42 import org.apache.mina.core.session.IoSession;
43 import org.apache.mina.core.session.IoSessionConfig;
44 import org.apache.mina.core.session.IoSessionRecycler;
45 import org.apache.mina.core.write.WriteRequest;
46 import org.apache.mina.core.write.WriteRequestQueue;
47 import org.apache.mina.util.ExceptionMonitor;
48
49
50
51
52
53
54
55
56 public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
57 extends AbstractIoAcceptor {
58
59 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
60
61 private final Object lock = new Object();
62 private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
63 private final Queue<AcceptorOperationFuture> registerQueue =
64 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
65 private final Queue<AcceptorOperationFuture> cancelQueue =
66 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
67 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
68 private final Map<SocketAddress, H> boundHandles =
69 Collections.synchronizedMap(new HashMap<SocketAddress, H>());
70
71 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
72
73 private final ServiceOperationFuture disposalFuture =
74 new ServiceOperationFuture();
75 private volatile boolean selectable;
76
77
78 private Acceptor acceptor;
79
80 private long lastIdleCheckTime;
81
82
83
84
85 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
86 this(sessionConfig, null);
87 }
88
89
90
91
92 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
93 super(sessionConfig, executor);
94
95 try {
96 init();
97 selectable = true;
98 } catch (RuntimeException e){
99 throw e;
100 } catch (Exception e) {
101 throw new RuntimeIoException("Failed to initialize.", e);
102 } finally {
103 if (!selectable) {
104 try {
105 destroy();
106 } catch (Exception e) {
107 ExceptionMonitor.getInstance().exceptionCaught(e);
108 }
109 }
110 }
111 }
112
113 protected abstract void init() throws Exception;
114 protected abstract void destroy() throws Exception;
115 protected abstract int select() throws Exception;
116 protected abstract int select(int timeout) throws Exception;
117 protected abstract void wakeup();
118 protected abstract Iterator<H> selectedHandles();
119 protected abstract H open(SocketAddress localAddress) throws Exception;
120 protected abstract void close(H handle) throws Exception;
121 protected abstract SocketAddress localAddress(H handle) throws Exception;
122 protected abstract boolean isReadable(H handle);
123 protected abstract boolean isWritable(H handle);
124 protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
125 protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
126 protected abstract T newSession(IoProcessor<T> processor, H handle, SocketAddress remoteAddress) throws Exception;
127 protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
128
129
130
131
132 @Override
133 protected IoFuture dispose0() throws Exception {
134 unbind();
135 if (!disposalFuture.isDone()) {
136 startupAcceptor();
137 wakeup();
138 }
139 return disposalFuture;
140 }
141
142
143
144
145 @Override
146 protected final Set<SocketAddress> bindInternal(
147 List<? extends SocketAddress> localAddresses) throws Exception {
148
149
150 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
151
152
153
154 registerQueue.add(request);
155
156
157
158 startupAcceptor();
159
160
161
162
163 wakeup();
164
165
166 request.awaitUninterruptibly();
167
168 if (request.getException() != null) {
169 throw request.getException();
170 }
171
172
173
174
175 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
176
177 for (H handle:boundHandles.values()) {
178 newLocalAddresses.add(localAddress(handle));
179 }
180
181 return newLocalAddresses;
182 }
183
184
185
186
187 @Override
188 protected final void unbind0(
189 List<? extends SocketAddress> localAddresses) throws Exception {
190 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
191
192 cancelQueue.add(request);
193 startupAcceptor();
194 wakeup();
195
196 request.awaitUninterruptibly();
197
198 if (request.getException() != null) {
199 throw request.getException();
200 }
201 }
202
203
204
205
206 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
207 if (isDisposing()) {
208 throw new IllegalStateException("Already disposed.");
209 }
210
211 if (remoteAddress == null) {
212 throw new NullPointerException("remoteAddress");
213 }
214
215 synchronized (bindLock) {
216 if (!isActive()) {
217 throw new IllegalStateException(
218 "Can't create a session from a unbound service.");
219 }
220
221 try {
222 return newSessionWithoutLock(remoteAddress, localAddress);
223 } catch (RuntimeException e) {
224 throw e;
225 } catch (Error e) {
226 throw e;
227 } catch (Exception e) {
228 throw new RuntimeIoException("Failed to create a session.", e);
229 }
230 }
231 }
232
233 private IoSession newSessionWithoutLock(
234 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
235 H handle = boundHandles.get(localAddress);
236 if (handle == null) {
237 throw new IllegalArgumentException("Unknown local address: " + localAddress);
238 }
239
240 IoSession session;
241 IoSessionRecycler sessionRecycler = getSessionRecycler();
242 synchronized (sessionRecycler) {
243 session = sessionRecycler.recycle(localAddress, remoteAddress);
244 if (session != null) {
245 return session;
246 }
247
248
249 T newSession = newSession(processor, handle, remoteAddress);
250 getSessionRecycler().put(newSession);
251 session = newSession;
252 }
253
254 initSession(session, null, null);
255
256 try {
257 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
258 getListeners().fireSessionCreated(session);
259 } catch (Throwable t) {
260 ExceptionMonitor.getInstance().exceptionCaught(t);
261 }
262
263 return session;
264 }
265
266 public final IoSessionRecycler getSessionRecycler() {
267 return sessionRecycler;
268 }
269
270 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
271 synchronized (bindLock) {
272 if (isActive()) {
273 throw new IllegalStateException(
274 "sessionRecycler can't be set while the acceptor is bound.");
275 }
276
277 if (sessionRecycler == null) {
278 sessionRecycler = DEFAULT_RECYCLER;
279 }
280 this.sessionRecycler = sessionRecycler;
281 }
282 }
283
284 private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
285
286 public void add(T session) {
287 }
288
289 public void flush(T session) {
290 if (scheduleFlush(session)) {
291 wakeup();
292 }
293 }
294
295 public void remove(T session) {
296 getSessionRecycler().remove(session);
297 getListeners().fireSessionDestroyed(session);
298 }
299
300 public void updateTrafficControl(T session) {
301 throw new UnsupportedOperationException();
302 }
303
304 public void dispose() {
305 }
306
307 public boolean isDisposed() {
308 return false;
309 }
310
311 public boolean isDisposing() {
312 return false;
313 }
314 }
315
316
317
318
319 private void startupAcceptor() {
320 if (!selectable) {
321 registerQueue.clear();
322 cancelQueue.clear();
323 flushingSessions.clear();
324 }
325
326 synchronized (lock) {
327 if (acceptor == null) {
328 acceptor = new Acceptor();
329 executeWorker(acceptor);
330 }
331 }
332 }
333
334 private boolean scheduleFlush(T session) {
335 if (session.setScheduledForFlush(true)) {
336 flushingSessions.add(session);
337 return true;
338 } else {
339 return false;
340 }
341 }
342
343
344
345
346
347
348 private class Acceptor implements Runnable {
349 public void run() {
350 int nHandles = 0;
351 lastIdleCheckTime = System.currentTimeMillis();
352
353 while (selectable) {
354 try {
355 int selected = select();
356
357 nHandles += registerHandles();
358
359 if (selected > 0) {
360 processReadySessions(selectedHandles());
361 }
362
363 long currentTime = System.currentTimeMillis();
364 flushSessions(currentTime);
365 nHandles -= unregisterHandles();
366
367 notifyIdleSessions(currentTime);
368
369 if (nHandles == 0) {
370 synchronized (lock) {
371 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
372 acceptor = null;
373 break;
374 }
375 }
376 }
377 } catch (Exception e) {
378 ExceptionMonitor.getInstance().exceptionCaught(e);
379
380 try {
381 Thread.sleep(1000);
382 } catch (InterruptedException e1) {
383 }
384 }
385 }
386
387 if (selectable && isDisposing()) {
388 selectable = false;
389 try {
390 destroy();
391 } catch (Exception e) {
392 ExceptionMonitor.getInstance().exceptionCaught(e);
393 } finally {
394 disposalFuture.setValue(true);
395 }
396 }
397 }
398 }
399
400 @SuppressWarnings("unchecked")
401 private void processReadySessions(Iterator<H> handles) {
402 while (handles.hasNext()) {
403 H h = handles.next();
404 handles.remove();
405 try {
406 if (isReadable(h)) {
407 readHandle(h);
408 }
409
410 if (isWritable(h)) {
411 for (IoSession session : getManagedSessions().values()) {
412 scheduleFlush((T) session);
413 }
414 }
415 } catch (Throwable t) {
416 ExceptionMonitor.getInstance().exceptionCaught(t);
417 }
418 }
419 }
420
421 private void readHandle(H handle) throws Exception {
422 IoBuffer readBuf = IoBuffer.allocate(
423 getSessionConfig().getReadBufferSize());
424
425 SocketAddress remoteAddress = receive(handle, readBuf);
426 if (remoteAddress != null) {
427 IoSession session = newSessionWithoutLock(
428 remoteAddress, localAddress(handle));
429
430 readBuf.flip();
431
432 IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
433 newBuf.put(readBuf);
434 newBuf.flip();
435
436 session.getFilterChain().fireMessageReceived(newBuf);
437 }
438 }
439
440 private void flushSessions(long currentTime) {
441 for (; ;) {
442 T session = flushingSessions.poll();
443 if (session == null) {
444 break;
445 }
446
447 session.setScheduledForFlush(false);
448
449 try {
450 boolean flushedAll = flush(session, currentTime);
451 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
452 !session.isScheduledForFlush()) {
453 scheduleFlush(session);
454 }
455 } catch (Exception e) {
456 session.getFilterChain().fireExceptionCaught(e);
457 }
458 }
459 }
460
461 private boolean flush(T session, long currentTime) throws Exception {
462
463 setInterestedInWrite(session, false);
464
465 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
466 final int maxWrittenBytes =
467 session.getConfig().getMaxReadBufferSize() +
468 (session.getConfig().getMaxReadBufferSize() >>> 1);
469
470 int writtenBytes = 0;
471 try {
472 for (; ;) {
473 WriteRequest req = session.getCurrentWriteRequest();
474 if (req == null) {
475 req = writeRequestQueue.poll(session);
476 if (req == null) {
477 break;
478 }
479 session.setCurrentWriteRequest(req);
480 }
481
482 IoBuffer buf = (IoBuffer) req.getMessage();
483 if (buf.remaining() == 0) {
484
485 session.setCurrentWriteRequest(null);
486 buf.reset();
487 session.getFilterChain().fireMessageSent(req);
488 continue;
489 }
490
491 SocketAddress destination = req.getDestination();
492 if (destination == null) {
493 destination = session.getRemoteAddress();
494 }
495
496 int localWrittenBytes = send(session, buf, destination);
497 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
498
499 setInterestedInWrite(session, true);
500 return false;
501 } else {
502 setInterestedInWrite(session, false);
503
504
505 session.setCurrentWriteRequest(null);
506 writtenBytes += localWrittenBytes;
507 buf.reset();
508 session.getFilterChain().fireMessageSent(req);
509 }
510 }
511 } finally {
512 session.increaseWrittenBytes(writtenBytes, currentTime);
513 }
514
515 return true;
516 }
517
518 private int registerHandles() {
519 for (;;) {
520 AcceptorOperationFuture req = registerQueue.poll();
521 if (req == null) {
522 break;
523 }
524
525 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
526 List<SocketAddress> localAddresses = req.getLocalAddresses();
527 try {
528 for (SocketAddress a: localAddresses) {
529 H handle = open(a);
530 newHandles.put(localAddress(handle), handle);
531 }
532 boundHandles.putAll(newHandles);
533
534 getListeners().fireServiceActivated();
535 req.setDone();
536 return newHandles.size();
537 } catch (Exception e) {
538 req.setException(e);
539 } finally {
540
541 if (req.getException() != null) {
542 for (H handle: newHandles.values()) {
543 try {
544 close(handle);
545 } catch (Exception e) {
546 ExceptionMonitor.getInstance().exceptionCaught(e);
547 }
548 }
549 wakeup();
550 }
551 }
552 }
553
554 return 0;
555 }
556
557 private int unregisterHandles() {
558 int nHandles = 0;
559 for (;;) {
560 AcceptorOperationFuture request = cancelQueue.poll();
561 if (request == null) {
562 break;
563 }
564
565
566 for (SocketAddress a: request.getLocalAddresses()) {
567 H handle = boundHandles.remove(a);
568 if (handle == null) {
569 continue;
570 }
571
572 try {
573 close(handle);
574 wakeup();
575 } catch (Throwable e) {
576 ExceptionMonitor.getInstance().exceptionCaught(e);
577 } finally {
578 nHandles ++;
579 }
580 }
581
582 request.setDone();
583 }
584
585 return nHandles;
586 }
587
588 private void notifyIdleSessions(long currentTime) {
589
590 if (currentTime - lastIdleCheckTime >= 1000) {
591 lastIdleCheckTime = currentTime;
592 AbstractIoSession.notifyIdleness(
593 getListeners().getManagedSessions().values().iterator(),
594 currentTime);
595 }
596 }
597 }