View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.future.IoFuture;
37  import org.apache.mina.core.service.AbstractIoAcceptor;
38  import org.apache.mina.core.service.IoAcceptor;
39  import org.apache.mina.core.service.IoProcessor;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.ExpiringSessionRecycler;
42  import org.apache.mina.core.session.IdleStatusChecker;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.session.IoSessionConfig;
45  import org.apache.mina.core.session.IoSessionRecycler;
46  import org.apache.mina.core.write.WriteRequest;
47  import org.apache.mina.core.write.WriteRequestQueue;
48  import org.apache.mina.util.ExceptionMonitor;
49  
50  /**
51   * TODO Add documentation
52   * {@link IoAcceptor} for datagram transport (UDP/IP).
53   *
54   * @author The Apache MINA Project (dev@mina.apache.org)
55   * @version $Rev: 678335 $, $Date: 2008-07-21 03:25:08 +0200 (lun, 21 jui 2008) $
56   */
57  public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
58          extends AbstractIoAcceptor {
59  
60      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
61  
62      private final Object lock = new Object();
63      private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
64      private final Queue<AcceptorOperationFuture> registerQueue =
65          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
66      private final Queue<AcceptorOperationFuture> cancelQueue =
67          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
68      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
69      private final Map<SocketAddress, H> boundHandles =
70          Collections.synchronizedMap(new HashMap<SocketAddress, H>());
71  
72      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
73  
74      private final ServiceOperationFuture disposalFuture =
75          new ServiceOperationFuture();
76      private volatile boolean selectable;
77      private Worker worker;
78      private long lastIdleCheckTime;
79  
80      /**
81       * Creates a new instance.
82       */
83      protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
84          this(sessionConfig, null);
85      }
86  
87      /**
88       * Creates a new instance.
89       */
90      protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
91          super(sessionConfig, executor);
92  
93          try {
94              init();
95              selectable = true;
96          } catch (RuntimeException e){
97              throw e;
98          } catch (Exception e) {
99              throw new RuntimeIoException("Failed to initialize.", e);
100         } finally {
101             if (!selectable) {
102                 try {
103                     destroy();
104                 } catch (Exception e) {
105                     ExceptionMonitor.getInstance().exceptionCaught(e);
106                 }
107             }
108         }
109     }
110 
111     protected abstract void init() throws Exception;
112     protected abstract void destroy() throws Exception;
113     protected abstract boolean select(int timeout) throws Exception;
114     protected abstract void wakeup();
115     protected abstract Iterator<H> selectedHandles();
116     protected abstract H open(SocketAddress localAddress) throws Exception;
117     protected abstract void close(H handle) throws Exception;
118     protected abstract SocketAddress localAddress(H handle) throws Exception;
119     protected abstract boolean isReadable(H handle);
120     protected abstract boolean isWritable(H handle);
121     protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
122     protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
123     protected abstract T newSession(IoProcessor<T> processor, H handle, SocketAddress remoteAddress) throws Exception;
124     protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
125 
126     /**
127      * {@inheritDoc}
128      */
129     @Override
130     protected IoFuture dispose0() throws Exception {
131         unbind();
132         if (!disposalFuture.isDone()) {
133             startupWorker();
134             wakeup();
135         }
136         return disposalFuture;
137     }
138 
139     /**
140      * {@inheritDoc}
141      */
142     @Override
143     protected final Set<SocketAddress> bind0(
144             List<? extends SocketAddress> localAddresses) throws Exception {
145         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
146 
147         registerQueue.add(request);
148         startupWorker();
149         wakeup();
150 
151         request.awaitUninterruptibly();
152 
153         if (request.getException() != null) {
154             throw request.getException();
155         }
156 
157         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
158         for (H handle: boundHandles.values()) {
159             newLocalAddresses.add(localAddress(handle));
160         }
161         return newLocalAddresses;
162     }
163 
164     /**
165      * {@inheritDoc}
166      */
167     @Override
168     protected final void unbind0(
169             List<? extends SocketAddress> localAddresses) throws Exception {
170         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
171 
172         cancelQueue.add(request);
173         startupWorker();
174         wakeup();
175 
176         request.awaitUninterruptibly();
177 
178         if (request.getException() != null) {
179             throw request.getException();
180         }
181     }
182 
183     /**
184      * {@inheritDoc}
185      */
186     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
187         if (isDisposing()) {
188             throw new IllegalStateException("Already disposed.");
189         }
190 
191         if (remoteAddress == null) {
192             throw new NullPointerException("remoteAddress");
193         }
194 
195         synchronized (bindLock) {
196             if (!isActive()) {
197                 throw new IllegalStateException(
198                         "Can't create a session from a unbound service.");
199             }
200 
201             try {
202                 return newSessionWithoutLock(remoteAddress, localAddress);
203             } catch (RuntimeException e) {
204                 throw e;
205             } catch (Error e) {
206                 throw e;
207             } catch (Exception e) {
208                 throw new RuntimeIoException("Failed to create a session.", e);
209             }
210         }
211     }
212 
213     private IoSession newSessionWithoutLock(
214             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
215         H handle = boundHandles.get(localAddress);
216         if (handle == null) {
217             throw new IllegalArgumentException("Unknown local address: " + localAddress);
218         }
219 
220         IoSession session;
221         IoSessionRecycler sessionRecycler = getSessionRecycler();
222         synchronized (sessionRecycler) {
223             session = sessionRecycler.recycle(localAddress, remoteAddress);
224             if (session != null) {
225                 return session;
226             }
227 
228             // If a new session needs to be created.
229             T newSession = newSession(processor, handle, remoteAddress);
230             getSessionRecycler().put(newSession);
231             session = newSession;
232         }
233 
234         finishSessionInitialization(session, null, null);
235 
236         try {
237             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
238             getListeners().fireSessionCreated(session);
239         } catch (Throwable t) {
240             ExceptionMonitor.getInstance().exceptionCaught(t);
241         }
242 
243         return session;
244     }
245 
246     public final IoSessionRecycler getSessionRecycler() {
247         return sessionRecycler;
248     }
249 
250     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
251         synchronized (bindLock) {
252             if (isActive()) {
253                 throw new IllegalStateException(
254                         "sessionRecycler can't be set while the acceptor is bound.");
255             }
256 
257             if (sessionRecycler == null) {
258                 sessionRecycler = DEFAULT_RECYCLER;
259             }
260             this.sessionRecycler = sessionRecycler;
261         }
262     }
263 
264     private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
265 
266         public void add(T session) {
267         }
268 
269         public void flush(T session) {
270             if (scheduleFlush(session)) {
271                 wakeup();
272             }
273         }
274 
275         public void remove(T session) {
276             getSessionRecycler().remove(session);
277             getListeners().fireSessionDestroyed(session);
278         }
279 
280         public void updateTrafficMask(T session) {
281             throw new UnsupportedOperationException();
282         }
283 
284         public void dispose() {
285         }
286 
287         public boolean isDisposed() {
288             return false;
289         }
290 
291         public boolean isDisposing() {
292             return false;
293         }
294     }
295 
296     private void startupWorker() {
297         if (!selectable) {
298             registerQueue.clear();
299             cancelQueue.clear();
300             flushingSessions.clear();
301         }
302 
303         synchronized (lock) {
304             if (worker == null) {
305                 worker = new Worker();
306                 executeWorker(worker);
307             }
308         }
309     }
310 
311     private boolean scheduleFlush(T session) {
312         if (session.setScheduledForFlush(true)) {
313             flushingSessions.add(session);
314             return true;
315         } else {
316             return false;
317         }
318     }
319 
320     private class Worker implements Runnable {
321         public void run() {
322             int nHandles = 0;
323             lastIdleCheckTime = System.currentTimeMillis();
324 
325             while (selectable) {
326                 try {
327                     boolean selected = select(1000);
328 
329                     nHandles += registerHandles();
330 
331                     if (selected) {
332                         processReadySessions(selectedHandles());
333                     }
334 
335                     long currentTime = System.currentTimeMillis();
336                     flushSessions(currentTime);
337                     nHandles -= unregisterHandles();
338 
339                     notifyIdleSessions(currentTime);
340 
341                     if (nHandles == 0) {
342                         synchronized (lock) {
343                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
344                                 worker = null;
345                                 break;
346                             }
347                         }
348                     }
349                 } catch (Exception e) {
350                     ExceptionMonitor.getInstance().exceptionCaught(e);
351 
352                     try {
353                         Thread.sleep(1000);
354                     } catch (InterruptedException e1) {
355                     }
356                 }
357             }
358 
359             if (selectable && isDisposing()) {
360                 selectable = false;
361                 try {
362                     destroy();
363                 } catch (Exception e) {
364                     ExceptionMonitor.getInstance().exceptionCaught(e);
365                 } finally {
366                     disposalFuture.setValue(true);
367                 }
368             }
369         }
370     }
371 
372     @SuppressWarnings("unchecked")
373     private void processReadySessions(Iterator<H> handles) {
374         while (handles.hasNext()) {
375             H h = handles.next();
376             handles.remove();
377             try {
378                 if (isReadable(h)) {
379                     readHandle(h);
380                 }
381 
382                 if (isWritable(h)) {
383                     for (IoSession session : getManagedSessions().values()) {
384                         scheduleFlush((T) session);
385                     }
386                 }
387             } catch (Throwable t) {
388                 ExceptionMonitor.getInstance().exceptionCaught(t);
389             }
390         }
391     }
392 
393     private void readHandle(H handle) throws Exception {
394         IoBuffer readBuf = IoBuffer.allocate(
395                 getSessionConfig().getReadBufferSize());
396 
397         SocketAddress remoteAddress = receive(handle, readBuf);
398         if (remoteAddress != null) {
399             IoSession session = newSessionWithoutLock(
400                     remoteAddress, localAddress(handle));
401 
402             readBuf.flip();
403 
404             IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
405             newBuf.put(readBuf);
406             newBuf.flip();
407 
408             session.getFilterChain().fireMessageReceived(newBuf);
409         }
410     }
411 
412     private void flushSessions(long currentTime) {
413         for (; ;) {
414             T session = flushingSessions.poll();
415             if (session == null) {
416                 break;
417             }
418 
419             session.setScheduledForFlush(false);
420 
421             try {
422                 boolean flushedAll = flush(session, currentTime);
423                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
424                     !session.isScheduledForFlush()) {
425                     scheduleFlush(session);
426                 }
427             } catch (Exception e) {
428                 session.getFilterChain().fireExceptionCaught(e);
429             }
430         }
431     }
432 
433     private boolean flush(T session, long currentTime) throws Exception {
434         // Clear OP_WRITE
435         setInterestedInWrite(session, false);
436 
437         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
438         final int maxWrittenBytes =
439             session.getConfig().getMaxReadBufferSize() +
440             (session.getConfig().getMaxReadBufferSize() >>> 1);
441 
442         int writtenBytes = 0;
443         try {
444             for (; ;) {
445                 WriteRequest req = session.getCurrentWriteRequest();
446                 if (req == null) {
447                     req = writeRequestQueue.poll(session);
448                     if (req == null) {
449                         break;
450                     }
451                     session.setCurrentWriteRequest(req);
452                 }
453 
454                 IoBuffer buf = (IoBuffer) req.getMessage();
455                 if (buf.remaining() == 0) {
456                     // Clear and fire event
457                     session.setCurrentWriteRequest(null);
458                     buf.reset();
459                     session.getFilterChain().fireMessageSent(req);
460                     continue;
461                 }
462 
463                 SocketAddress destination = req.getDestination();
464                 if (destination == null) {
465                     destination = session.getRemoteAddress();
466                 }
467 
468                 int localWrittenBytes = send(session, buf, destination);
469                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
470                     // Kernel buffer is full or wrote too much
471                     setInterestedInWrite(session, true);
472                     return false;
473                 } else {
474                     setInterestedInWrite(session, false);
475 
476                     // Clear and fire event
477                     session.setCurrentWriteRequest(null);
478                     writtenBytes += localWrittenBytes;
479                     buf.reset();
480                     session.getFilterChain().fireMessageSent(req);
481                 }
482             }
483         } finally {
484             session.increaseWrittenBytes(writtenBytes, currentTime);
485         }
486 
487         return true;
488     }
489 
490     private int registerHandles() {
491         for (;;) {
492             AcceptorOperationFuture req = registerQueue.poll();
493             if (req == null) {
494                 break;
495             }
496 
497             Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
498             List<SocketAddress> localAddresses = req.getLocalAddresses();
499             try {
500                 for (SocketAddress a: localAddresses) {
501                     H handle = open(a);
502                     newHandles.put(localAddress(handle), handle);
503                 }
504                 boundHandles.putAll(newHandles);
505 
506                 getListeners().fireServiceActivated();
507                 req.setDone();
508                 return newHandles.size();
509             } catch (Exception e) {
510                 req.setException(e);
511             } finally {
512                 // Roll back if failed to bind all addresses.
513                 if (req.getException() != null) {
514                     for (H handle: newHandles.values()) {
515                         try {
516                             close(handle);
517                         } catch (Exception e) {
518                             ExceptionMonitor.getInstance().exceptionCaught(e);
519                         }
520                     }
521                     wakeup();
522                 }
523             }
524         }
525 
526         return 0;
527     }
528 
529     private int unregisterHandles() {
530         int nHandles = 0;
531         for (;;) {
532             AcceptorOperationFuture request = cancelQueue.poll();
533             if (request == null) {
534                 break;
535             }
536 
537             // close the channels
538             for (SocketAddress a: request.getLocalAddresses()) {
539                 H handle = boundHandles.remove(a);
540                 if (handle == null) {
541                     continue;
542                 }
543 
544                 try {
545                     close(handle);
546                     wakeup(); // wake up again to trigger thread death
547                 } catch (Throwable e) {
548                     ExceptionMonitor.getInstance().exceptionCaught(e);
549                 } finally {
550                     nHandles ++;
551                 }
552             }
553 
554             request.setDone();
555         }
556 
557         return nHandles;
558     }
559 
560     private void notifyIdleSessions(long currentTime) {
561         // process idle sessions
562         if (currentTime - lastIdleCheckTime >= 1000) {
563             lastIdleCheckTime = currentTime;
564             IdleStatusChecker.notifyIdleness(
565                     getListeners().getManagedSessions().values().iterator(),
566                     currentTime);
567         }
568     }
569 }