View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.AbstractSet;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.IoUtil;
34  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37  import org.apache.mina.core.future.ConnectFuture;
38  import org.apache.mina.core.future.DefaultIoFuture;
39  import org.apache.mina.core.future.IoFuture;
40  import org.apache.mina.core.future.WriteFuture;
41  import org.apache.mina.core.session.AbstractIoSession;
42  import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43  import org.apache.mina.core.session.IdleStatus;
44  import org.apache.mina.core.session.IdleStatusChecker;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.IoSessionDataStructureFactory;
48  import org.apache.mina.core.session.IoSessionInitializationException;
49  import org.apache.mina.core.session.IoSessionInitializer;
50  import org.apache.mina.util.ExceptionMonitor;
51  import org.apache.mina.util.NamePreservingRunnable;
52  
53  /**
54   * Base implementation of {@link IoService}s.
55   * 
56   * An instance of IoService contains an Executor which will handle the incoming
57   * events.
58   *
59   * @author The Apache MINA Project (dev@mina.apache.org)
60   * @version $Rev: 684597 $, $Date: 2008-08-10 23:20:07 +0200 (dim, 10 aoĆ» 2008) $
61   */
62  public abstract class AbstractIoService implements IoService {
63      /** 
64       * The unique number identifying the Service. It's incremented
65       * for each new IoService created.
66       */
67      private static final AtomicInteger id = new AtomicInteger();
68  
69      /** 
70       * The thread name built from the IoService inherited 
71       * instance class name and the IoService Id 
72       **/
73      private final String threadName;
74  
75      /**
76       * The associated executor, responsible for handling execution of I/O events.
77       */
78      private final Executor executor;
79  
80      /**
81       * A flag used to indicate that the local executor has been created
82       * inside this instance, and not passed by a caller.
83       * 
84       * If the executor is locally created, then it will be an instance
85       * of the ThreadPoolExecutor class.
86       */
87      private final boolean createdExecutor;
88  
89      /**
90       * The IoHandler in charge of managing all the I/O Events. It is 
91       */
92      private IoHandler handler;
93  
94      /**
95       * The default {@link IoSessionConfig} which will be used to configure new sessions.
96       */
97      private final IoSessionConfig sessionConfig;
98  
99      private final IoServiceListener serviceActivationListener = new IoServiceListener() {
100         public void serviceActivated(IoService service) {
101             // Update lastIoTime.
102             AbstractIoService s = (AbstractIoService) service;
103             IoServiceStatistics _stats = (IoServiceStatistics) s.getStatistics();
104             _stats.setLastReadTime(s.getActivationTime());
105             _stats.setLastWriteTime(s.getActivationTime());
106             _stats.setLastThroughputCalculationTime(s.getActivationTime());
107 
108             // Start idleness notification.
109             idleStatusChecker.addService(s);
110         }
111 
112         public void serviceDeactivated(IoService service) {
113             idleStatusChecker.removeService((AbstractIoService) service);
114         }
115 
116         public void serviceIdle(IoService service, IdleStatus idleStatus) {
117         }
118 
119         public void sessionCreated(IoSession session) {
120         }
121 
122         public void sessionDestroyed(IoSession session) {
123         }
124     };
125 
126     /**
127      * Current filter chain builder.
128      */
129     private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
130 
131     private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
132 
133     /**
134      * Maintains the {@link IoServiceListener}s of this service.
135      */
136     private final IoServiceListenerSupport listeners;
137 
138     /**
139      * A lock object which must be acquired when related resources are
140      * destroyed.
141      */
142     protected final Object disposalLock = new Object();
143 
144     private volatile boolean disposing;
145 
146     private volatile boolean disposed;
147 
148     private IoFuture disposalFuture;
149 
150     private final IdleStatusChecker idleStatusChecker = new IdleStatusChecker();
151 
152     /**
153      * {@inheritDoc}
154      */
155     private IoServiceStatistics stats = new IoServiceStatistics(this);
156     
157     /**
158      * Reference to the object holding all the idle state vars.
159      */    
160     private IoServiceIdleState idleState = new IoServiceIdleState(this);
161 
162     /**
163      * Constructor for {@link AbstractIoService}. You need to provide a default
164      * session configuration and an {@link Executor} for handling I/O events. If
165      * a null {@link Executor} is provided, a default one will be created using
166      * {@link Executors#newCachedThreadPool()}.
167      * 
168      * @param sessionConfig
169      *            the default configuration for the managed {@link IoSession}
170      * @param executor
171      *            the {@link Executor} used for handling execution of I/O
172      *            events. Can be <code>null</code>.
173      */
174     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
175         if (sessionConfig == null) {
176             throw new NullPointerException("sessionConfig");
177         }
178 
179         if (getTransportMetadata() == null) {
180             throw new NullPointerException("TransportMetadata");
181         }
182 
183         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
184                 sessionConfig.getClass())) {
185             throw new IllegalArgumentException("sessionConfig type: "
186                     + sessionConfig.getClass() + " (expected: "
187                     + getTransportMetadata().getSessionConfigType() + ")");
188         }
189 
190         // Create the listeners, and add a first listener : a activation listener
191         // for this service, which will give information on the service state.
192         listeners = new IoServiceListenerSupport(this);
193         listeners.add(serviceActivationListener);
194 
195         // Stores the given session configuration
196         this.sessionConfig = sessionConfig;
197 
198         // Make JVM load the exception monitor before some transports
199         // change the thread context class loader.
200         ExceptionMonitor.getInstance();
201 
202         if (executor == null) {
203             this.executor = Executors.newCachedThreadPool();
204             createdExecutor = true;
205         } else {
206             this.executor = executor;
207             createdExecutor = false;
208         }
209 
210         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
211 
212         executeWorker(idleStatusChecker.getNotifyingTask(), "idleStatusChecker");
213     }
214 
215     /**
216      * {@inheritDoc}
217      */
218     public final IoFilterChainBuilder getFilterChainBuilder() {
219         return filterChainBuilder;
220     }
221 
222     /**
223      * {@inheritDoc}
224      */
225     public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
226         if (builder == null) {
227             builder = new DefaultIoFilterChainBuilder();
228         }
229         filterChainBuilder = builder;
230     }
231 
232     /**
233      * {@inheritDoc}
234      */
235     public final DefaultIoFilterChainBuilder getFilterChain() {
236         if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
237             return (DefaultIoFilterChainBuilder) filterChainBuilder;
238         } else {
239             throw new IllegalStateException(
240                     "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
241         }
242     }
243 
244     /**
245      * {@inheritDoc}
246      */
247     public final void addListener(IoServiceListener listener) {
248         listeners.add(listener);
249     }
250 
251     /**
252      * {@inheritDoc}
253      */
254     public final void removeListener(IoServiceListener listener) {
255         listeners.remove(listener);
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     public final boolean isActive() {
262         return listeners.isActive();
263     }
264 
265     /**
266      * {@inheritDoc}
267      */
268     public final boolean isDisposing() {
269         return disposing;
270     }
271 
272     /**
273      * {@inheritDoc}
274      */
275     public final boolean isDisposed() {
276         return disposed;
277     }
278 
279     /**
280      * {@inheritDoc}
281      */
282     public final void dispose() {
283         if (disposed) {
284             return;
285         }
286 
287         IoFuture disposalFuture;
288         synchronized (disposalLock) {
289             disposalFuture = this.disposalFuture;
290             if (!disposing) {
291                 disposing = true;
292                 try {
293                     this.disposalFuture = disposalFuture = dispose0();
294                 } catch (Exception e) {
295                     ExceptionMonitor.getInstance().exceptionCaught(e);
296                 } finally {
297                     if (disposalFuture == null) {
298                         disposed = true;
299                     }
300                 }
301             }
302         }
303 
304         idleStatusChecker.getNotifyingTask().cancel();
305         if (disposalFuture != null) {
306             disposalFuture.awaitUninterruptibly();
307         }
308 
309         if (createdExecutor) {
310             ExecutorService e = (ExecutorService) executor;
311             e.shutdown();
312             while (!e.isTerminated()) {
313                 try {
314                     e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
315                 } catch (InterruptedException e1) {
316                     // Ignore; it should end shortly.
317                 }
318             }
319         }
320 
321         disposed = true;
322     }
323 
324     /**
325      * Implement this method to release any acquired resources.  This method
326      * is invoked only once by {@link #dispose()}.
327      */
328     protected abstract IoFuture dispose0() throws Exception;
329 
330     /**
331      * {@inheritDoc}
332      */
333     public final Map<Long, IoSession> getManagedSessions() {
334         return listeners.getManagedSessions();
335     }
336 
337     /**
338      * {@inheritDoc}
339      */
340     public final int getManagedSessionCount() {
341         return listeners.getManagedSessionCount();
342     }
343 
344     /**
345      * {@inheritDoc}
346      */
347     public final IoHandler getHandler() {
348         return handler;
349     }
350 
351     /**
352      * {@inheritDoc}
353      */
354     public final void setHandler(IoHandler handler) {
355         if (handler == null) {
356             throw new NullPointerException("handler cannot be null");
357         }
358 
359         if (isActive()) {
360             throw new IllegalStateException(
361                     "handler cannot be set while the service is active.");
362         }
363 
364         this.handler = handler;
365     }
366 
367     /**
368      * {@inheritDoc}
369      */
370     public IoSessionConfig getSessionConfig() {
371         return sessionConfig;
372     }
373 
374     /**
375      * {@inheritDoc}
376      */
377     public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
378         return sessionDataStructureFactory;
379     }
380 
381     /**
382      * {@inheritDoc}
383      */
384     public final void setSessionDataStructureFactory(
385             IoSessionDataStructureFactory sessionDataStructureFactory) {
386         if (sessionDataStructureFactory == null) {
387             throw new NullPointerException("sessionDataStructureFactory");
388         }
389 
390         if (isActive()) {
391             throw new IllegalStateException(
392                     "sessionDataStructureFactory cannot be set while the service is active.");
393         }
394 
395         this.sessionDataStructureFactory = sessionDataStructureFactory;
396     }
397 
398     /**
399      * {@inheritDoc}
400      */
401     public IoServiceIdleState getIdleState() {
402         return idleState;
403     }
404 
405     /**
406      * {@inheritDoc}
407      */
408     public IoServiceStatistics getStatistics() {
409         return stats;
410     }
411 
412     /**
413      * {@inheritDoc}
414      */
415     public final long getActivationTime() {
416         return listeners.getActivationTime();
417     }
418 
419     /**
420      * {@inheritDoc}
421      */
422     public final Set<WriteFuture> broadcast(Object message) {
423         // Convert to Set.  We do not return a List here because only the
424         // direct caller of MessageBroadcaster knows the order of write
425         // operations.
426         final List<WriteFuture> futures = IoUtil.broadcast(message,
427                 getManagedSessions().values());
428         return new AbstractSet<WriteFuture>() {
429             @Override
430             public Iterator<WriteFuture> iterator() {
431                 return futures.iterator();
432             }
433 
434             @Override
435             public int size() {
436                 return futures.size();
437             }
438         };
439     }
440 
441     public final IoServiceListenerSupport getListeners() {
442         return listeners;
443     }
444 
445     protected final IdleStatusChecker getIdleStatusChecker() {
446         return idleStatusChecker;
447     }
448 
449     protected final void executeWorker(Runnable worker) {
450         executeWorker(worker, null);
451     }
452 
453     protected final void executeWorker(Runnable worker, String suffix) {
454         String actualThreadName = threadName;
455         if (suffix != null) {
456             actualThreadName = actualThreadName + '-' + suffix;
457         }
458         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
459     }
460 
461     // TODO Figure out make it work without causing a compiler error / warning.
462     @SuppressWarnings("unchecked")
463     protected final void finishSessionInitialization(IoSession session,
464             IoFuture future, IoSessionInitializer sessionInitializer) {
465         // Update lastIoTime if needed.
466         if (stats.getLastReadTime() == 0) {
467             ((IoServiceStatistics)stats).setLastReadTime(getActivationTime());
468         }
469         if (stats.getLastWriteTime() == 0) {
470             ((IoServiceStatistics)stats).setLastWriteTime(getActivationTime());
471         }
472 
473         // Every property but attributeMap should be set now.
474         // Now initialize the attributeMap.  The reason why we initialize
475         // the attributeMap at last is to make sure all session properties
476         // such as remoteAddress are provided to IoSessionDataStructureFactory.
477         try {
478             ((AbstractIoSession) session).setAttributeMap(session.getService()
479                     .getSessionDataStructureFactory().getAttributeMap(session));
480         } catch (IoSessionInitializationException e) {
481             throw e;
482         } catch (Exception e) {
483             throw new IoSessionInitializationException(
484                     "Failed to initialize an attributeMap.", e);
485         }
486 
487         try {
488             ((AbstractIoSession) session).setWriteRequestQueue(session
489                     .getService().getSessionDataStructureFactory()
490                     .getWriteRequestQueue(session));
491         } catch (IoSessionInitializationException e) {
492             throw e;
493         } catch (Exception e) {
494             throw new IoSessionInitializationException(
495                     "Failed to initialize a writeRequestQueue.", e);
496         }
497 
498         if (future != null && future instanceof ConnectFuture) {
499             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
500             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE,
501                     future);
502         }
503 
504         if (sessionInitializer != null) {
505             sessionInitializer.initializeSession(session, future);
506         }
507 
508         finishSessionInitialization0(session, future);
509     }
510 
511     /**
512      * Implement this method to perform additional tasks required for session
513      * initialization. Do not call this method directly;
514      * {@link #finishSessionInitialization(IoSession, IoFuture, IoSessionInitializer)} will call
515      * this method instead.
516      */
517     protected void finishSessionInitialization0(IoSession session,
518             IoFuture future) {
519     }
520 
521     protected static class ServiceOperationFuture extends DefaultIoFuture {
522         public ServiceOperationFuture() {
523             super(null);
524         }
525 
526         public final boolean isDone() {
527             return getValue() == Boolean.TRUE;
528         }
529 
530         public final void setDone() {
531             setValue(Boolean.TRUE);
532         }
533 
534         public final Exception getException() {
535             if (getValue() instanceof Exception) {
536                 return (Exception) getValue();
537             } else {
538                 return null;
539             }
540         }
541 
542         public final void setException(Exception exception) {
543             if (exception == null) {
544                 throw new NullPointerException("exception");
545             }
546             setValue(exception);
547         }
548     }
549 
550     /**
551      * {@inheritDoc}
552      */
553     public int getScheduledWriteBytes() {
554         return stats.getScheduledWriteBytes();
555     }
556 
557     /**
558      * {@inheritDoc}
559      */
560     public int getScheduledWriteMessages() {
561         return stats.getScheduledWriteMessages();
562     }
563     
564     /**
565      * TODO
566      */    
567     public void notifyIdleness(long currentTime) {
568         idleState.notifyIdleness(currentTime);
569     }
570 }