1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33
34 import org.apache.mina.core.RuntimeIoException;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.future.IoFuture;
37 import org.apache.mina.core.service.AbstractIoAcceptor;
38 import org.apache.mina.core.service.IoAcceptor;
39 import org.apache.mina.core.service.IoProcessor;
40 import org.apache.mina.core.session.AbstractIoSession;
41 import org.apache.mina.core.session.ExpiringSessionRecycler;
42 import org.apache.mina.core.session.IdleStatusChecker;
43 import org.apache.mina.core.session.IoSession;
44 import org.apache.mina.core.session.IoSessionConfig;
45 import org.apache.mina.core.session.IoSessionRecycler;
46 import org.apache.mina.core.write.WriteRequest;
47 import org.apache.mina.core.write.WriteRequestQueue;
48 import org.apache.mina.util.ExceptionMonitor;
49
50
51
52
53
54
55
56
57 public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
58 extends AbstractIoAcceptor {
59
60 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
61
62 private final Object lock = new Object();
63 private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
64 private final Queue<AcceptorOperationFuture> registerQueue =
65 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
66 private final Queue<AcceptorOperationFuture> cancelQueue =
67 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
68 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
69 private final Map<SocketAddress, H> boundHandles =
70 Collections.synchronizedMap(new HashMap<SocketAddress, H>());
71
72 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
73
74 private final ServiceOperationFuture disposalFuture =
75 new ServiceOperationFuture();
76 private volatile boolean selectable;
77 private Worker worker;
78 private long lastIdleCheckTime;
79
80
81
82
83 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
84 this(sessionConfig, null);
85 }
86
87
88
89
90 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
91 super(sessionConfig, executor);
92
93 try {
94 init();
95 selectable = true;
96 } catch (RuntimeException e){
97 throw e;
98 } catch (Exception e) {
99 throw new RuntimeIoException("Failed to initialize.", e);
100 } finally {
101 if (!selectable) {
102 try {
103 destroy();
104 } catch (Exception e) {
105 ExceptionMonitor.getInstance().exceptionCaught(e);
106 }
107 }
108 }
109 }
110
111 protected abstract void init() throws Exception;
112 protected abstract void destroy() throws Exception;
113 protected abstract boolean select(int timeout) throws Exception;
114 protected abstract void wakeup();
115 protected abstract Iterator<H> selectedHandles();
116 protected abstract H open(SocketAddress localAddress) throws Exception;
117 protected abstract void close(H handle) throws Exception;
118 protected abstract SocketAddress localAddress(H handle) throws Exception;
119 protected abstract boolean isReadable(H handle);
120 protected abstract boolean isWritable(H handle);
121 protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
122 protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
123 protected abstract T newSession(IoProcessor<T> processor, H handle, SocketAddress remoteAddress) throws Exception;
124 protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
125
126
127
128
129 @Override
130 protected IoFuture dispose0() throws Exception {
131 unbind();
132 if (!disposalFuture.isDone()) {
133 startupWorker();
134 wakeup();
135 }
136 return disposalFuture;
137 }
138
139
140
141
142 @Override
143 protected final Set<SocketAddress> bind0(
144 List<? extends SocketAddress> localAddresses) throws Exception {
145 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
146
147 registerQueue.add(request);
148 startupWorker();
149 wakeup();
150
151 request.awaitUninterruptibly();
152
153 if (request.getException() != null) {
154 throw request.getException();
155 }
156
157 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
158 for (H handle: boundHandles.values()) {
159 newLocalAddresses.add(localAddress(handle));
160 }
161 return newLocalAddresses;
162 }
163
164
165
166
167 @Override
168 protected final void unbind0(
169 List<? extends SocketAddress> localAddresses) throws Exception {
170 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
171
172 cancelQueue.add(request);
173 startupWorker();
174 wakeup();
175
176 request.awaitUninterruptibly();
177
178 if (request.getException() != null) {
179 throw request.getException();
180 }
181 }
182
183
184
185
186 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
187 if (isDisposing()) {
188 throw new IllegalStateException("Already disposed.");
189 }
190
191 if (remoteAddress == null) {
192 throw new NullPointerException("remoteAddress");
193 }
194
195 synchronized (bindLock) {
196 if (!isActive()) {
197 throw new IllegalStateException(
198 "Can't create a session from a unbound service.");
199 }
200
201 try {
202 return newSessionWithoutLock(remoteAddress, localAddress);
203 } catch (RuntimeException e) {
204 throw e;
205 } catch (Error e) {
206 throw e;
207 } catch (Exception e) {
208 throw new RuntimeIoException("Failed to create a session.", e);
209 }
210 }
211 }
212
213 private IoSession newSessionWithoutLock(
214 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
215 H handle = boundHandles.get(localAddress);
216 if (handle == null) {
217 throw new IllegalArgumentException("Unknown local address: " + localAddress);
218 }
219
220 IoSession session;
221 IoSessionRecycler sessionRecycler = getSessionRecycler();
222 synchronized (sessionRecycler) {
223 session = sessionRecycler.recycle(localAddress, remoteAddress);
224 if (session != null) {
225 return session;
226 }
227
228
229 T newSession = newSession(processor, handle, remoteAddress);
230 getSessionRecycler().put(newSession);
231 session = newSession;
232 }
233
234 finishSessionInitialization(session, null, null);
235
236 try {
237 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
238 getListeners().fireSessionCreated(session);
239 } catch (Throwable t) {
240 ExceptionMonitor.getInstance().exceptionCaught(t);
241 }
242
243 return session;
244 }
245
246 public final IoSessionRecycler getSessionRecycler() {
247 return sessionRecycler;
248 }
249
250 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
251 synchronized (bindLock) {
252 if (isActive()) {
253 throw new IllegalStateException(
254 "sessionRecycler can't be set while the acceptor is bound.");
255 }
256
257 if (sessionRecycler == null) {
258 sessionRecycler = DEFAULT_RECYCLER;
259 }
260 this.sessionRecycler = sessionRecycler;
261 }
262 }
263
264 private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
265
266 public void add(T session) {
267 }
268
269 public void flush(T session) {
270 if (scheduleFlush(session)) {
271 wakeup();
272 }
273 }
274
275 public void remove(T session) {
276 getSessionRecycler().remove(session);
277 getListeners().fireSessionDestroyed(session);
278 }
279
280 public void updateTrafficMask(T session) {
281 throw new UnsupportedOperationException();
282 }
283
284 public void dispose() {
285 }
286
287 public boolean isDisposed() {
288 return false;
289 }
290
291 public boolean isDisposing() {
292 return false;
293 }
294 }
295
296 private void startupWorker() {
297 if (!selectable) {
298 registerQueue.clear();
299 cancelQueue.clear();
300 flushingSessions.clear();
301 }
302
303 synchronized (lock) {
304 if (worker == null) {
305 worker = new Worker();
306 executeWorker(worker);
307 }
308 }
309 }
310
311 private boolean scheduleFlush(T session) {
312 if (session.setScheduledForFlush(true)) {
313 flushingSessions.add(session);
314 return true;
315 } else {
316 return false;
317 }
318 }
319
320 private class Worker implements Runnable {
321 public void run() {
322 int nHandles = 0;
323 lastIdleCheckTime = System.currentTimeMillis();
324
325 while (selectable) {
326 try {
327 boolean selected = select(1000);
328
329 nHandles += registerHandles();
330
331 if (selected) {
332 processReadySessions(selectedHandles());
333 }
334
335 long currentTime = System.currentTimeMillis();
336 flushSessions(currentTime);
337 nHandles -= unregisterHandles();
338
339 notifyIdleSessions(currentTime);
340
341 if (nHandles == 0) {
342 synchronized (lock) {
343 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
344 worker = null;
345 break;
346 }
347 }
348 }
349 } catch (Exception e) {
350 ExceptionMonitor.getInstance().exceptionCaught(e);
351
352 try {
353 Thread.sleep(1000);
354 } catch (InterruptedException e1) {
355 }
356 }
357 }
358
359 if (selectable && isDisposing()) {
360 selectable = false;
361 try {
362 destroy();
363 } catch (Exception e) {
364 ExceptionMonitor.getInstance().exceptionCaught(e);
365 } finally {
366 disposalFuture.setValue(true);
367 }
368 }
369 }
370 }
371
372 @SuppressWarnings("unchecked")
373 private void processReadySessions(Iterator<H> handles) {
374 while (handles.hasNext()) {
375 H h = handles.next();
376 handles.remove();
377 try {
378 if (isReadable(h)) {
379 readHandle(h);
380 }
381
382 if (isWritable(h)) {
383 for (IoSession session : getManagedSessions().values()) {
384 scheduleFlush((T) session);
385 }
386 }
387 } catch (Throwable t) {
388 ExceptionMonitor.getInstance().exceptionCaught(t);
389 }
390 }
391 }
392
393 private void readHandle(H handle) throws Exception {
394 IoBuffer readBuf = IoBuffer.allocate(
395 getSessionConfig().getReadBufferSize());
396
397 SocketAddress remoteAddress = receive(handle, readBuf);
398 if (remoteAddress != null) {
399 IoSession session = newSessionWithoutLock(
400 remoteAddress, localAddress(handle));
401
402 readBuf.flip();
403
404 IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
405 newBuf.put(readBuf);
406 newBuf.flip();
407
408 session.getFilterChain().fireMessageReceived(newBuf);
409 }
410 }
411
412 private void flushSessions(long currentTime) {
413 for (; ;) {
414 T session = flushingSessions.poll();
415 if (session == null) {
416 break;
417 }
418
419 session.setScheduledForFlush(false);
420
421 try {
422 boolean flushedAll = flush(session, currentTime);
423 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
424 !session.isScheduledForFlush()) {
425 scheduleFlush(session);
426 }
427 } catch (Exception e) {
428 session.getFilterChain().fireExceptionCaught(e);
429 }
430 }
431 }
432
433 private boolean flush(T session, long currentTime) throws Exception {
434
435 setInterestedInWrite(session, false);
436
437 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
438 final int maxWrittenBytes =
439 session.getConfig().getMaxReadBufferSize() +
440 (session.getConfig().getMaxReadBufferSize() >>> 1);
441
442 int writtenBytes = 0;
443 try {
444 for (; ;) {
445 WriteRequest req = session.getCurrentWriteRequest();
446 if (req == null) {
447 req = writeRequestQueue.poll(session);
448 if (req == null) {
449 break;
450 }
451 session.setCurrentWriteRequest(req);
452 }
453
454 IoBuffer buf = (IoBuffer) req.getMessage();
455 if (buf.remaining() == 0) {
456
457 session.setCurrentWriteRequest(null);
458 buf.reset();
459 session.getFilterChain().fireMessageSent(req);
460 continue;
461 }
462
463 SocketAddress destination = req.getDestination();
464 if (destination == null) {
465 destination = session.getRemoteAddress();
466 }
467
468 int localWrittenBytes = send(session, buf, destination);
469 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
470
471 setInterestedInWrite(session, true);
472 return false;
473 } else {
474 setInterestedInWrite(session, false);
475
476
477 session.setCurrentWriteRequest(null);
478 writtenBytes += localWrittenBytes;
479 buf.reset();
480 session.getFilterChain().fireMessageSent(req);
481 }
482 }
483 } finally {
484 session.increaseWrittenBytes(writtenBytes, currentTime);
485 }
486
487 return true;
488 }
489
490 private int registerHandles() {
491 for (;;) {
492 AcceptorOperationFuture req = registerQueue.poll();
493 if (req == null) {
494 break;
495 }
496
497 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
498 List<SocketAddress> localAddresses = req.getLocalAddresses();
499 try {
500 for (SocketAddress a: localAddresses) {
501 H handle = open(a);
502 newHandles.put(localAddress(handle), handle);
503 }
504 boundHandles.putAll(newHandles);
505
506 getListeners().fireServiceActivated();
507 req.setDone();
508 return newHandles.size();
509 } catch (Exception e) {
510 req.setException(e);
511 } finally {
512
513 if (req.getException() != null) {
514 for (H handle: newHandles.values()) {
515 try {
516 close(handle);
517 } catch (Exception e) {
518 ExceptionMonitor.getInstance().exceptionCaught(e);
519 }
520 }
521 wakeup();
522 }
523 }
524 }
525
526 return 0;
527 }
528
529 private int unregisterHandles() {
530 int nHandles = 0;
531 for (;;) {
532 AcceptorOperationFuture request = cancelQueue.poll();
533 if (request == null) {
534 break;
535 }
536
537
538 for (SocketAddress a: request.getLocalAddresses()) {
539 H handle = boundHandles.remove(a);
540 if (handle == null) {
541 continue;
542 }
543
544 try {
545 close(handle);
546 wakeup();
547 } catch (Throwable e) {
548 ExceptionMonitor.getInstance().exceptionCaught(e);
549 } finally {
550 nHandles ++;
551 }
552 }
553
554 request.setDone();
555 }
556
557 return nHandles;
558 }
559
560 private void notifyIdleSessions(long currentTime) {
561
562 if (currentTime - lastIdleCheckTime >= 1000) {
563 lastIdleCheckTime = currentTime;
564 IdleStatusChecker.notifyIdleness(
565 getListeners().getManagedSessions().values().iterator(),
566 currentTime);
567 }
568 }
569 }