View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.future.IoFuture;
37  import org.apache.mina.core.service.AbstractIoAcceptor;
38  import org.apache.mina.core.service.IoAcceptor;
39  import org.apache.mina.core.service.IoProcessor;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.ExpiringSessionRecycler;
42  import org.apache.mina.core.session.IoSession;
43  import org.apache.mina.core.session.IoSessionConfig;
44  import org.apache.mina.core.session.IoSessionRecycler;
45  import org.apache.mina.core.write.WriteRequest;
46  import org.apache.mina.core.write.WriteRequestQueue;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * TODO Add documentation
51   * {@link IoAcceptor} for datagram transport (UDP/IP).
52   *
53   * @author The Apache MINA Project (dev@mina.apache.org)
54   * @org.apache.xbean.XBean
55   */
56  public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
57          extends AbstractIoAcceptor {
58  
59      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
60  
61      private final Object lock = new Object();
62      private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
63      private final Queue<AcceptorOperationFuture> registerQueue =
64          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
65      private final Queue<AcceptorOperationFuture> cancelQueue =
66          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
67      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
68      private final Map<SocketAddress, H> boundHandles =
69          Collections.synchronizedMap(new HashMap<SocketAddress, H>());
70  
71      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
72  
73      private final ServiceOperationFuture disposalFuture =
74          new ServiceOperationFuture();
75      private volatile boolean selectable;
76      
77      /** The thread responsible of accepting incoming requests */ 
78      private Acceptor acceptor;
79  
80      private long lastIdleCheckTime;
81  
82      /**
83       * Creates a new instance.
84       */
85      protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
86          this(sessionConfig, null);
87      }
88  
89      /**
90       * Creates a new instance.
91       */
92      protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
93          super(sessionConfig, executor);
94  
95          try {
96              init();
97              selectable = true;
98          } catch (RuntimeException e){
99              throw e;
100         } catch (Exception e) {
101             throw new RuntimeIoException("Failed to initialize.", e);
102         } finally {
103             if (!selectable) {
104                 try {
105                     destroy();
106                 } catch (Exception e) {
107                     ExceptionMonitor.getInstance().exceptionCaught(e);
108                 }
109             }
110         }
111     }
112 
113     protected abstract void init() throws Exception;
114     protected abstract void destroy() throws Exception;
115     protected abstract int select() throws Exception;
116     protected abstract int select(int timeout) throws Exception;
117     protected abstract void wakeup();
118     protected abstract Iterator<H> selectedHandles();
119     protected abstract H open(SocketAddress localAddress) throws Exception;
120     protected abstract void close(H handle) throws Exception;
121     protected abstract SocketAddress localAddress(H handle) throws Exception;
122     protected abstract boolean isReadable(H handle);
123     protected abstract boolean isWritable(H handle);
124     protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
125     protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
126     protected abstract T newSession(IoProcessor<T> processor, H handle, SocketAddress remoteAddress) throws Exception;
127     protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
128 
129     /**
130      * {@inheritDoc}
131      */
132     @Override
133     protected IoFuture dispose0() throws Exception {
134         unbind();
135         if (!disposalFuture.isDone()) {
136             startupAcceptor();
137             wakeup();
138         }
139         return disposalFuture;
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     @Override
146     protected final Set<SocketAddress> bindInternal(
147             List<? extends SocketAddress> localAddresses) throws Exception {
148         // Create a bind request as a Future operation. When the selector
149         // have handled the registration, it will signal this future.
150         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
151 
152         // adds the Registration request to the queue for the Workers
153         // to handle
154         registerQueue.add(request);
155 
156         // creates the Acceptor instance and has the local
157         // executor kick it off.
158         startupAcceptor();
159         
160         // As we just started the acceptor, we have to unblock the select()
161         // in order to process the bind request we just have added to the 
162         // registerQueue.
163         wakeup();
164 
165         // Now, we wait until this request is completed.
166         request.awaitUninterruptibly();
167 
168         if (request.getException() != null) {
169             throw request.getException();
170         }
171 
172         // Update the local addresses.
173         // setLocalAddresses() shouldn't be called from the worker thread
174         // because of deadlock.
175         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
176 
177         for (H handle:boundHandles.values()) {
178             newLocalAddresses.add(localAddress(handle));
179         }
180         
181         return newLocalAddresses;
182     }
183 
184     /**
185      * {@inheritDoc}
186      */
187     @Override
188     protected final void unbind0(
189             List<? extends SocketAddress> localAddresses) throws Exception {
190         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
191 
192         cancelQueue.add(request);
193         startupAcceptor();
194         wakeup();
195 
196         request.awaitUninterruptibly();
197 
198         if (request.getException() != null) {
199             throw request.getException();
200         }
201     }
202 
203     /**
204      * {@inheritDoc}
205      */
206     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
207         if (isDisposing()) {
208             throw new IllegalStateException("Already disposed.");
209         }
210 
211         if (remoteAddress == null) {
212             throw new NullPointerException("remoteAddress");
213         }
214 
215         synchronized (bindLock) {
216             if (!isActive()) {
217                 throw new IllegalStateException(
218                         "Can't create a session from a unbound service.");
219             }
220 
221             try {
222                 return newSessionWithoutLock(remoteAddress, localAddress);
223             } catch (RuntimeException e) {
224                 throw e;
225             } catch (Error e) {
226                 throw e;
227             } catch (Exception e) {
228                 throw new RuntimeIoException("Failed to create a session.", e);
229             }
230         }
231     }
232 
233     private IoSession newSessionWithoutLock(
234             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
235         H handle = boundHandles.get(localAddress);
236         if (handle == null) {
237             throw new IllegalArgumentException("Unknown local address: " + localAddress);
238         }
239 
240         IoSession session;
241         IoSessionRecycler sessionRecycler = getSessionRecycler();
242         synchronized (sessionRecycler) {
243             session = sessionRecycler.recycle(localAddress, remoteAddress);
244             if (session != null) {
245                 return session;
246             }
247 
248             // If a new session needs to be created.
249             T newSession = newSession(processor, handle, remoteAddress);
250             getSessionRecycler().put(newSession);
251             session = newSession;
252         }
253 
254         initSession(session, null, null);
255 
256         try {
257             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
258             getListeners().fireSessionCreated(session);
259         } catch (Throwable t) {
260             ExceptionMonitor.getInstance().exceptionCaught(t);
261         }
262 
263         return session;
264     }
265 
266     public final IoSessionRecycler getSessionRecycler() {
267         return sessionRecycler;
268     }
269 
270     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
271         synchronized (bindLock) {
272             if (isActive()) {
273                 throw new IllegalStateException(
274                         "sessionRecycler can't be set while the acceptor is bound.");
275             }
276 
277             if (sessionRecycler == null) {
278                 sessionRecycler = DEFAULT_RECYCLER;
279             }
280             this.sessionRecycler = sessionRecycler;
281         }
282     }
283 
284     private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
285 
286         public void add(T session) {
287         }
288 
289         public void flush(T session) {
290             if (scheduleFlush(session)) {
291                 wakeup();
292             }
293         }
294 
295         public void remove(T session) {
296             getSessionRecycler().remove(session);
297             getListeners().fireSessionDestroyed(session);
298         }
299 
300         public void updateTrafficControl(T session) {
301             throw new UnsupportedOperationException();
302         }
303 
304         public void dispose() {
305         }
306 
307         public boolean isDisposed() {
308             return false;
309         }
310 
311         public boolean isDisposing() {
312             return false;
313         }
314     }
315 
316     /**
317      * Starts the inner Acceptor thread.
318      */
319     private void startupAcceptor() {
320         if (!selectable) {
321             registerQueue.clear();
322             cancelQueue.clear();
323             flushingSessions.clear();
324         }
325 
326         synchronized (lock) {
327             if (acceptor == null) {
328                 acceptor = new Acceptor();
329                 executeWorker(acceptor);
330             }
331         }
332     }
333 
334     private boolean scheduleFlush(T session) {
335         if (session.setScheduledForFlush(true)) {
336             flushingSessions.add(session);
337             return true;
338         } else {
339             return false;
340         }
341     }
342 
343     /**
344      * This private class is used to accept incoming connection from 
345      * clients. It's an infinite loop, which can be stopped when all
346      * the registered handles have been removed (unbound). 
347      */
348     private class Acceptor implements Runnable {
349         public void run() {
350             int nHandles = 0;
351             lastIdleCheckTime = System.currentTimeMillis();
352 
353             while (selectable) {
354                 try {
355                     int selected = select();
356 
357                     nHandles += registerHandles();
358 
359                     if (selected > 0) {
360                         processReadySessions(selectedHandles());
361                     }
362 
363                     long currentTime = System.currentTimeMillis();
364                     flushSessions(currentTime);
365                     nHandles -= unregisterHandles();
366 
367                     notifyIdleSessions(currentTime);
368 
369                     if (nHandles == 0) {
370                         synchronized (lock) {
371                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
372                                 acceptor = null;
373                                 break;
374                             }
375                         }
376                     }
377                 } catch (Exception e) {
378                     ExceptionMonitor.getInstance().exceptionCaught(e);
379 
380                     try {
381                         Thread.sleep(1000);
382                     } catch (InterruptedException e1) {
383                     }
384                 }
385             }
386 
387             if (selectable && isDisposing()) {
388                 selectable = false;
389                 try {
390                     destroy();
391                 } catch (Exception e) {
392                     ExceptionMonitor.getInstance().exceptionCaught(e);
393                 } finally {
394                     disposalFuture.setValue(true);
395                 }
396             }
397         }
398     }
399 
400     @SuppressWarnings("unchecked")
401     private void processReadySessions(Iterator<H> handles) {
402         while (handles.hasNext()) {
403             H h = handles.next();
404             handles.remove();
405             try {
406                 if (isReadable(h)) {
407                     readHandle(h);
408                 }
409 
410                 if (isWritable(h)) {
411                     for (IoSession session : getManagedSessions().values()) {
412                         scheduleFlush((T) session);
413                     }
414                 }
415             } catch (Throwable t) {
416                 ExceptionMonitor.getInstance().exceptionCaught(t);
417             }
418         }
419     }
420 
421     private void readHandle(H handle) throws Exception {
422         IoBuffer readBuf = IoBuffer.allocate(
423                 getSessionConfig().getReadBufferSize());
424 
425         SocketAddress remoteAddress = receive(handle, readBuf);
426         if (remoteAddress != null) {
427             IoSession session = newSessionWithoutLock(
428                     remoteAddress, localAddress(handle));
429 
430             readBuf.flip();
431 
432             IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
433             newBuf.put(readBuf);
434             newBuf.flip();
435 
436             session.getFilterChain().fireMessageReceived(newBuf);
437         }
438     }
439 
440     private void flushSessions(long currentTime) {
441         for (; ;) {
442             T session = flushingSessions.poll();
443             if (session == null) {
444                 break;
445             }
446 
447             session.setScheduledForFlush(false);
448 
449             try {
450                 boolean flushedAll = flush(session, currentTime);
451                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
452                     !session.isScheduledForFlush()) {
453                     scheduleFlush(session);
454                 }
455             } catch (Exception e) {
456                 session.getFilterChain().fireExceptionCaught(e);
457             }
458         }
459     }
460 
461     private boolean flush(T session, long currentTime) throws Exception {
462         // Clear OP_WRITE
463         setInterestedInWrite(session, false);
464 
465         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
466         final int maxWrittenBytes =
467             session.getConfig().getMaxReadBufferSize() +
468             (session.getConfig().getMaxReadBufferSize() >>> 1);
469 
470         int writtenBytes = 0;
471         try {
472             for (; ;) {
473                 WriteRequest req = session.getCurrentWriteRequest();
474                 if (req == null) {
475                     req = writeRequestQueue.poll(session);
476                     if (req == null) {
477                         break;
478                     }
479                     session.setCurrentWriteRequest(req);
480                 }
481 
482                 IoBuffer buf = (IoBuffer) req.getMessage();
483                 if (buf.remaining() == 0) {
484                     // Clear and fire event
485                     session.setCurrentWriteRequest(null);
486                     buf.reset();
487                     session.getFilterChain().fireMessageSent(req);
488                     continue;
489                 }
490 
491                 SocketAddress destination = req.getDestination();
492                 if (destination == null) {
493                     destination = session.getRemoteAddress();
494                 }
495 
496                 int localWrittenBytes = send(session, buf, destination);
497                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
498                     // Kernel buffer is full or wrote too much
499                     setInterestedInWrite(session, true);
500                     return false;
501                 } else {
502                     setInterestedInWrite(session, false);
503 
504                     // Clear and fire event
505                     session.setCurrentWriteRequest(null);
506                     writtenBytes += localWrittenBytes;
507                     buf.reset();
508                     session.getFilterChain().fireMessageSent(req);
509                 }
510             }
511         } finally {
512             session.increaseWrittenBytes(writtenBytes, currentTime);
513         }
514 
515         return true;
516     }
517 
518     private int registerHandles() {
519         for (;;) {
520             AcceptorOperationFuture req = registerQueue.poll();
521             if (req == null) {
522                 break;
523             }
524 
525             Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
526             List<SocketAddress> localAddresses = req.getLocalAddresses();
527             try {
528                 for (SocketAddress a: localAddresses) {
529                     H handle = open(a);
530                     newHandles.put(localAddress(handle), handle);
531                 }
532                 boundHandles.putAll(newHandles);
533 
534                 getListeners().fireServiceActivated();
535                 req.setDone();
536                 return newHandles.size();
537             } catch (Exception e) {
538                 req.setException(e);
539             } finally {
540                 // Roll back if failed to bind all addresses.
541                 if (req.getException() != null) {
542                     for (H handle: newHandles.values()) {
543                         try {
544                             close(handle);
545                         } catch (Exception e) {
546                             ExceptionMonitor.getInstance().exceptionCaught(e);
547                         }
548                     }
549                     wakeup();
550                 }
551             }
552         }
553 
554         return 0;
555     }
556 
557     private int unregisterHandles() {
558         int nHandles = 0;
559         for (;;) {
560             AcceptorOperationFuture request = cancelQueue.poll();
561             if (request == null) {
562                 break;
563             }
564 
565             // close the channels
566             for (SocketAddress a: request.getLocalAddresses()) {
567                 H handle = boundHandles.remove(a);
568                 if (handle == null) {
569                     continue;
570                 }
571 
572                 try {
573                     close(handle);
574                     wakeup(); // wake up again to trigger thread death
575                 } catch (Throwable e) {
576                     ExceptionMonitor.getInstance().exceptionCaught(e);
577                 } finally {
578                     nHandles ++;
579                 }
580             }
581 
582             request.setDone();
583         }
584 
585         return nHandles;
586     }
587 
588     private void notifyIdleSessions(long currentTime) {
589         // process idle sessions
590         if (currentTime - lastIdleCheckTime >= 1000) {
591             lastIdleCheckTime = currentTime;
592             AbstractIoSession.notifyIdleness(
593                     getListeners().getManagedSessions().values().iterator(),
594                     currentTime);
595         }
596     }
597 }