View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.Inet4Address;
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.SocketAddress;
27  import java.nio.channels.ClosedSelectorException;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.Executor;
38  
39  import org.apache.mina.core.RuntimeIoException;
40  import org.apache.mina.core.buffer.IoBuffer;
41  import org.apache.mina.core.service.AbstractIoAcceptor;
42  import org.apache.mina.core.service.IoAcceptor;
43  import org.apache.mina.core.service.IoProcessor;
44  import org.apache.mina.core.session.AbstractIoSession;
45  import org.apache.mina.core.session.ExpiringSessionRecycler;
46  import org.apache.mina.core.session.IoSession;
47  import org.apache.mina.core.session.IoSessionConfig;
48  import org.apache.mina.core.session.IoSessionRecycler;
49  import org.apache.mina.core.write.WriteRequest;
50  import org.apache.mina.core.write.WriteRequestQueue;
51  import org.apache.mina.util.ExceptionMonitor;
52  
53  /**
54   * {@link IoAcceptor} for datagram transport (UDP/IP).
55   *
56   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
57   * @org.apache.xbean.XBean
58   * 
59    * @param <S> the type of the {@link IoSession} this processor can handle
60  */
61  public abstract class AbstractPollingConnectionlessIoAcceptor<S extends AbstractIoSession, H>
62          extends AbstractIoAcceptor {
63  
64      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
65  
66      /**
67       * A timeout used for the select, as we need to get out to deal with idle
68       * sessions
69       */
70      private static final long SELECT_TIMEOUT = 1000L;
71  
72      private final Object lock = new Object();
73  
74      private final IoProcessor<S> processor = new ConnectionlessAcceptorProcessor();
75      private final Queue<AcceptorOperationFuture> registerQueue =
76          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
77      private final Queue<AcceptorOperationFuture> cancelQueue =
78          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
79  
80      private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
81      private final Map<String, H> boundHandles =
82          Collections.synchronizedMap(new HashMap<String, H>());
83  
84      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
85  
86      private final ServiceOperationFuture disposalFuture =
87          new ServiceOperationFuture();
88      private volatile boolean selectable;
89      
90      /** The thread responsible of accepting incoming requests */ 
91      private Acceptor acceptor;
92  
93      private long lastIdleCheckTime;
94      
95      private String getAddressAsString(SocketAddress address) {
96      	InetAddress inetAddress = ((InetSocketAddress)address).getAddress();
97      	int port = ((InetSocketAddress)address).getPort();
98      	
99      	String result = null;
100     	
101     	if ( inetAddress instanceof Inet4Address ) {
102     		result = "/" + inetAddress.getHostAddress() + ":" + port;
103     	} else {
104     		// Inet6
105     		if ( ((Inet6Address)inetAddress).isIPv4CompatibleAddress() ) {
106     			byte[] bytes = inetAddress.getAddress();
107     			
108     			result = "/" + bytes[12] + "." + bytes[13] + "." + bytes[14] + "." + bytes[15] + ":" + port;
109     		} else {
110     			result = inetAddress.toString();
111     		}
112     	}
113     	
114     	return result;
115     }
116 
117     /**
118      * Creates a new instance.
119      */
120     protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
121         this(sessionConfig, null);
122     }
123 
124     /**
125      * Creates a new instance.
126      */
127     protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
128         super(sessionConfig, executor);
129 
130         try {
131             init();
132             selectable = true;
133         } catch (RuntimeException e) {
134             throw e;
135         } catch (Exception e) {
136             throw new RuntimeIoException("Failed to initialize.", e);
137         } finally {
138             if (!selectable) {
139                 try {
140                     destroy();
141                 } catch (Exception e) {
142                     ExceptionMonitor.getInstance().exceptionCaught(e);
143                 }
144             }
145         }
146     }
147 
148     protected abstract void init() throws Exception;
149     protected abstract void destroy() throws Exception;
150     protected abstract int select() throws Exception;
151     protected abstract int select(long timeout) throws Exception;
152     protected abstract void wakeup();
153     protected abstract Iterator<H> selectedHandles();
154     protected abstract H open(SocketAddress localAddress) throws Exception;
155     protected abstract void close(H handle) throws Exception;
156     protected abstract SocketAddress localAddress(H handle) throws Exception;
157     protected abstract boolean isReadable(H handle);
158     protected abstract boolean isWritable(H handle);
159     protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
160 
161     protected abstract int send(S session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
162 
163     protected abstract S newSession(IoProcessor<S> processor, H handle, SocketAddress remoteAddress) throws Exception;
164 
165     protected abstract void setInterestedInWrite(S session, boolean interested) throws Exception;
166 
167     /**
168      * {@inheritDoc}
169      */
170     @Override
171     protected void dispose0() throws Exception {
172         unbind();
173         startupAcceptor();
174         wakeup();
175     }
176 
177     /**
178      * {@inheritDoc}
179      */
180     @Override
181     protected final Set<SocketAddress> bindInternal(
182             List<? extends SocketAddress> localAddresses) throws Exception {
183         // Create a bind request as a Future operation. When the selector
184         // have handled the registration, it will signal this future.
185         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
186 
187         // adds the Registration request to the queue for the Workers
188         // to handle
189         registerQueue.add(request);
190 
191         // creates the Acceptor instance and has the local
192         // executor kick it off.
193         startupAcceptor();
194         
195         // As we just started the acceptor, we have to unblock the select()
196         // in order to process the bind request we just have added to the 
197         // registerQueue.
198         wakeup();
199 
200         // Now, we wait until this request is completed.
201         request.awaitUninterruptibly();
202 
203         if (request.getException() != null) {
204             throw request.getException();
205         }
206 
207         // Update the local addresses.
208         // setLocalAddresses() shouldn't be called from the worker thread
209         // because of deadlock.
210         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
211 
212         for (H handle : boundHandles.values()) {
213             newLocalAddresses.add(localAddress(handle));
214         }
215         
216         return newLocalAddresses;
217     }
218 
219     /**
220      * {@inheritDoc}
221      */
222     @Override
223     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
224         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
225 
226         cancelQueue.add(request);
227         startupAcceptor();
228         wakeup();
229 
230         request.awaitUninterruptibly();
231 
232         if (request.getException() != null) {
233             throw request.getException();
234         }
235     }
236 
237     /**
238      * {@inheritDoc}
239      */
240     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
241         if (isDisposing()) {
242             throw new IllegalStateException("Already disposed.");
243         }
244 
245         if (remoteAddress == null) {
246             throw new IllegalArgumentException("remoteAddress");
247         }
248 
249         synchronized (bindLock) {
250             if (!isActive()) {
251                 throw new IllegalStateException(
252                         "Can't create a session from a unbound service.");
253             }
254 
255             try {
256                 return newSessionWithoutLock(remoteAddress, localAddress);
257             } catch (RuntimeException e) {
258                 throw e;
259             } catch (Error e) {
260                 throw e;
261             } catch (Exception e) {
262                 throw new RuntimeIoException("Failed to create a session.", e);
263             }
264         }
265     }
266 
267     private IoSession newSessionWithoutLock(
268             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
269         H handle = boundHandles.get(getAddressAsString(localAddress));
270         
271         if (handle == null) {
272             throw new IllegalArgumentException("Unknown local address: " + localAddress);
273         }
274 
275         IoSession session;
276         IoSessionRecycler sessionRecycler = getSessionRecycler();
277         
278         synchronized (sessionRecycler) {
279             session = sessionRecycler.recycle(localAddress, remoteAddress);
280             
281             if (session != null) {
282                 return session;
283             }
284 
285             // If a new session needs to be created.
286             S newSession = newSession(processor, handle, remoteAddress);
287             getSessionRecycler().put(newSession);
288             session = newSession;
289         }
290 
291         initSession(session, null, null);
292 
293         try {
294             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
295             getListeners().fireSessionCreated(session);
296         } catch (Throwable t) {
297             ExceptionMonitor.getInstance().exceptionCaught(t);
298         }
299 
300         return session;
301     }
302 
303     public final IoSessionRecycler getSessionRecycler() {
304         return sessionRecycler;
305     }
306 
307     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
308         synchronized (bindLock) {
309             if (isActive()) {
310                 throw new IllegalStateException(
311                         "sessionRecycler can't be set while the acceptor is bound.");
312             }
313 
314             if (sessionRecycler == null) {
315                 sessionRecycler = DEFAULT_RECYCLER;
316             }
317             
318             this.sessionRecycler = sessionRecycler;
319         }
320     }
321 
322     private class ConnectionlessAcceptorProcessor implements IoProcessor<S> {
323 
324         public void add(S session) {
325         }
326 
327         public void flush(S session) {
328             if (scheduleFlush(session)) {
329                 wakeup();
330             }
331         }
332 
333         public void remove(S session) {
334             getSessionRecycler().remove(session);
335             getListeners().fireSessionDestroyed(session);
336         }
337 
338         public void updateTrafficControl(S session) {
339             throw new UnsupportedOperationException();
340         }
341 
342         public void dispose() {
343         }
344 
345         public boolean isDisposed() {
346             return false;
347         }
348 
349         public boolean isDisposing() {
350             return false;
351         }
352     }
353 
354     /**
355      * Starts the inner Acceptor thread.
356      */
357     private void startupAcceptor() {
358         if (!selectable) {
359             registerQueue.clear();
360             cancelQueue.clear();
361             flushingSessions.clear();
362         }
363 
364         synchronized (lock) {
365             if (acceptor == null) {
366                 acceptor = new Acceptor();
367                 executeWorker(acceptor);
368             }
369         }
370     }
371 
372     private boolean scheduleFlush(S session) {
373         // Set the schedule for flush flag if the session
374         // has not already be added to the flushingSessions
375         // queue
376         if (session.setScheduledForFlush(true)) {
377             flushingSessions.add(session);
378             return true;
379         } else {
380             return false;
381         }
382     }
383 
384     /**
385      * This private class is used to accept incoming connection from 
386      * clients. It's an infinite loop, which can be stopped when all
387      * the registered handles have been removed (unbound). 
388      */
389     private class Acceptor implements Runnable {
390         public void run() {
391             int nHandles = 0;
392             lastIdleCheckTime = System.currentTimeMillis();
393 
394             while (selectable) {
395                 try {
396                     int selected = select(SELECT_TIMEOUT);
397 
398                     nHandles += registerHandles();
399 
400                     if (selected > 0) {
401                         processReadySessions(selectedHandles());
402                     }
403 
404                     long currentTime = System.currentTimeMillis();
405                     flushSessions(currentTime);
406                     nHandles -= unregisterHandles();
407 
408                     notifyIdleSessions(currentTime);
409 
410                     if (nHandles == 0) {
411                         synchronized (lock) {
412                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
413                                 acceptor = null;
414                                 break;
415                             }
416                         }
417                     }
418                 } catch (ClosedSelectorException cse) {
419                     // If the selector has been closed, we can exit the loop
420                     break;
421                 } catch (Exception e) {
422                     ExceptionMonitor.getInstance().exceptionCaught(e);
423 
424                     try {
425                         Thread.sleep(1000);
426                     } catch (InterruptedException e1) {
427                     }
428                 }
429             }
430 
431             if (selectable && isDisposing()) {
432                 selectable = false;
433                 try {
434                     destroy();
435                 } catch (Exception e) {
436                     ExceptionMonitor.getInstance().exceptionCaught(e);
437                 } finally {
438                     disposalFuture.setValue(true);
439                 }
440             }
441         }
442     }
443 
444     @SuppressWarnings("unchecked")
445     private void processReadySessions(Iterator<H> handles) {
446         while (handles.hasNext()) {
447             H h = handles.next();
448             handles.remove();
449             
450             try {
451                 if (isReadable(h)) {
452                     readHandle(h);
453                 }
454 
455                 if (isWritable(h)) {
456                     for (IoSession session : getManagedSessions().values()) {
457                         scheduleFlush((S) session);
458                     }
459                 }
460             } catch (Throwable t) {
461                 ExceptionMonitor.getInstance().exceptionCaught(t);
462             }
463         }
464     }
465 
466     private void readHandle(H handle) throws Exception {
467         IoBuffer readBuf = IoBuffer.allocate(
468                 getSessionConfig().getReadBufferSize());
469 
470         SocketAddress remoteAddress = receive(handle, readBuf);
471         
472         if (remoteAddress != null) {
473             IoSession session = newSessionWithoutLock(
474                     remoteAddress, localAddress(handle));
475 
476             readBuf.flip();
477 
478             IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
479             newBuf.put(readBuf);
480             newBuf.flip();
481 
482             session.getFilterChain().fireMessageReceived(newBuf);
483         }
484     }
485 
486     private void flushSessions(long currentTime) {
487         for (;;) {
488             S session = flushingSessions.poll();
489             
490             if (session == null) {
491                 break;
492             }
493 
494             // Reset the Schedule for flush flag for this session,
495             // as we are flushing it now
496             session.unscheduledForFlush();
497 
498             try {
499                 boolean flushedAll = flush(session, currentTime);
500                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
501                     !session.isScheduledForFlush()) {
502                     scheduleFlush(session);
503                 }
504             } catch (Exception e) {
505                 session.getFilterChain().fireExceptionCaught(e);
506             }
507         }
508     }
509 
510     private boolean flush(S session, long currentTime) throws Exception {
511         // Clear OP_WRITE
512         setInterestedInWrite(session, false);
513 
514         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
515         final int maxWrittenBytes =
516             session.getConfig().getMaxReadBufferSize() +
517             (session.getConfig().getMaxReadBufferSize() >>> 1);
518 
519         int writtenBytes = 0;
520         
521         try {
522             for (;;) {
523                 WriteRequest req = session.getCurrentWriteRequest();
524                 
525                 if (req == null) {
526                     req = writeRequestQueue.poll(session);
527                     if (req == null) {
528                         break;
529                     }
530                     session.setCurrentWriteRequest(req);
531                 }
532 
533                 IoBuffer buf = (IoBuffer) req.getMessage();
534                 
535                 if (buf.remaining() == 0) {
536                     // Clear and fire event
537                     session.setCurrentWriteRequest(null);
538                     buf.reset();
539                     session.getFilterChain().fireMessageSent(req);
540                     continue;
541                 }
542 
543                 SocketAddress destination = req.getDestination();
544                 
545                 if (destination == null) {
546                     destination = session.getRemoteAddress();
547                 }
548 
549                 int localWrittenBytes = send(session, buf, destination);
550                 
551                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
552                     // Kernel buffer is full or wrote too much
553                     setInterestedInWrite(session, true);
554                     return false;
555                 } else {
556                     setInterestedInWrite(session, false);
557 
558                     // Clear and fire event
559                     session.setCurrentWriteRequest(null);
560                     writtenBytes += localWrittenBytes;
561                     buf.reset();
562                     session.getFilterChain().fireMessageSent(req);
563                 }
564             }
565         } finally {
566             session.increaseWrittenBytes(writtenBytes, currentTime);
567         }
568 
569         return true;
570     }
571 
572     private int registerHandles() {
573         for (;;) {
574             AcceptorOperationFuture req = registerQueue.poll();
575             
576             if (req == null) {
577                 break;
578             }
579 
580             Map<String, H> newHandles = new HashMap<String, H>();
581             List<SocketAddress> localAddresses = req.getLocalAddresses();
582             
583             try {
584                 for (SocketAddress socketAddress : localAddresses) {
585                     H handle = open(socketAddress);
586                     newHandles.put(getAddressAsString(localAddress(handle)), handle);
587                 }
588                 
589                 boundHandles.putAll(newHandles);
590 
591                 getListeners().fireServiceActivated();
592                 req.setDone();
593                 
594                 return newHandles.size();
595             } catch (Exception e) {
596                 req.setException(e);
597             } finally {
598                 // Roll back if failed to bind all addresses.
599                 if (req.getException() != null) {
600                     for (H handle : newHandles.values()) {
601                         try {
602                             close(handle);
603                         } catch (Exception e) {
604                             ExceptionMonitor.getInstance().exceptionCaught(e);
605                         }
606                     }
607                     
608                     wakeup();
609                 }
610             }
611         }
612 
613         return 0;
614     }
615 
616     private int unregisterHandles() {
617         int nHandles = 0;
618         
619         for (;;) {
620             AcceptorOperationFuture request = cancelQueue.poll();
621             if (request == null) {
622                 break;
623             }
624 
625             // close the channels
626             for (SocketAddress socketAddress : request.getLocalAddresses()) {
627                 H handle = boundHandles.remove(getAddressAsString(socketAddress));
628                 
629                 if (handle == null) {
630                     continue;
631                 }
632 
633                 try {
634                     close(handle);
635                     wakeup(); // wake up again to trigger thread death
636                 } catch (Throwable e) {
637                     ExceptionMonitor.getInstance().exceptionCaught(e);
638                 } finally {
639                     nHandles++;
640                 }
641             }
642 
643             request.setDone();
644         }
645 
646         return nHandles;
647     }
648 
649     private void notifyIdleSessions(long currentTime) {
650         // process idle sessions
651         if (currentTime - lastIdleCheckTime >= 1000) {
652             lastIdleCheckTime = currentTime;
653             AbstractIoSession.notifyIdleness(
654                     getListeners().getManagedSessions().values().iterator(),
655                     currentTime);
656         }
657     }
658 }