View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.AbstractSet;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.IoUtil;
34  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37  import org.apache.mina.core.future.ConnectFuture;
38  import org.apache.mina.core.future.DefaultIoFuture;
39  import org.apache.mina.core.future.IoFuture;
40  import org.apache.mina.core.future.WriteFuture;
41  import org.apache.mina.core.session.AbstractIoSession;
42  import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43  import org.apache.mina.core.session.IdleStatus;
44  import org.apache.mina.core.session.IoSession;
45  import org.apache.mina.core.session.IoSessionConfig;
46  import org.apache.mina.core.session.IoSessionDataStructureFactory;
47  import org.apache.mina.core.session.IoSessionInitializationException;
48  import org.apache.mina.core.session.IoSessionInitializer;
49  import org.apache.mina.util.ExceptionMonitor;
50  import org.apache.mina.util.NamePreservingRunnable;
51  
52  /**
53   * Base implementation of {@link IoService}s.
54   * 
55   * An instance of IoService contains an Executor which will handle the incoming
56   * events.
57   *
58   * @author The Apache MINA Project (dev@mina.apache.org)
59   */
60  public abstract class AbstractIoService implements IoService {
61      /** 
62       * The unique number identifying the Service. It's incremented
63       * for each new IoService created.
64       */
65      private static final AtomicInteger id = new AtomicInteger();
66  
67      /** 
68       * The thread name built from the IoService inherited 
69       * instance class name and the IoService Id 
70       **/
71      private final String threadName;
72  
73      /**
74       * The associated executor, responsible for handling execution of I/O events.
75       */
76      private final Executor executor;
77  
78      /**
79       * A flag used to indicate that the local executor has been created
80       * inside this instance, and not passed by a caller.
81       * 
82       * If the executor is locally created, then it will be an instance
83       * of the ThreadPoolExecutor class.
84       */
85      private final boolean createdExecutor;
86  
87      /**
88       * The IoHandler in charge of managing all the I/O Events. It is 
89       */
90      private IoHandler handler;
91  
92      /**
93       * The default {@link IoSessionConfig} which will be used to configure new sessions.
94       */
95      private final IoSessionConfig sessionConfig;
96  
97      private final IoServiceListener serviceActivationListener = new IoServiceListener() {
98          public void serviceActivated(IoService service) {
99              // Update lastIoTime.
100             AbstractIoService s = (AbstractIoService) service;
101             IoServiceStatistics _stats = s.getStatistics();
102             _stats.setLastReadTime(s.getActivationTime());
103             _stats.setLastWriteTime(s.getActivationTime());
104             _stats.setLastThroughputCalculationTime(s.getActivationTime());
105 
106         }
107 
108         public void serviceDeactivated(IoService service) {
109             // Empty handler
110         }
111 
112         public void serviceIdle(IoService service, IdleStatus idleStatus) {
113             // Empty handler
114         }
115 
116         public void sessionCreated(IoSession session) {
117             // Empty handler
118         }
119 
120         public void sessionDestroyed(IoSession session) {
121             // Empty handler
122         }
123     };
124 
125     /**
126      * Current filter chain builder.
127      */
128     private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
129 
130     private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
131 
132     /**
133      * Maintains the {@link IoServiceListener}s of this service.
134      */
135     private final IoServiceListenerSupport listeners;
136 
137     /**
138      * A lock object which must be acquired when related resources are
139      * destroyed.
140      */
141     protected final Object disposalLock = new Object();
142 
143     private volatile boolean disposing;
144 
145     private volatile boolean disposed;
146 
147     private IoFuture disposalFuture;
148 
149     /**
150      * {@inheritDoc}
151      */
152     private IoServiceStatistics stats = new IoServiceStatistics(this);
153     
154 
155     /**
156      * Constructor for {@link AbstractIoService}. You need to provide a default
157      * session configuration and an {@link Executor} for handling I/O events. If
158      * a null {@link Executor} is provided, a default one will be created using
159      * {@link Executors#newCachedThreadPool()}.
160      * 
161      * @param sessionConfig
162      *            the default configuration for the managed {@link IoSession}
163      * @param executor
164      *            the {@link Executor} used for handling execution of I/O
165      *            events. Can be <code>null</code>.
166      */
167     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
168         if (sessionConfig == null) {
169             throw new NullPointerException("sessionConfig");
170         }
171 
172         if (getTransportMetadata() == null) {
173             throw new NullPointerException("TransportMetadata");
174         }
175 
176         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
177                 sessionConfig.getClass())) {
178             throw new IllegalArgumentException("sessionConfig type: "
179                     + sessionConfig.getClass() + " (expected: "
180                     + getTransportMetadata().getSessionConfigType() + ")");
181         }
182 
183         // Create the listeners, and add a first listener : a activation listener
184         // for this service, which will give information on the service state.
185         listeners = new IoServiceListenerSupport(this);
186         listeners.add(serviceActivationListener);
187 
188         // Stores the given session configuration
189         this.sessionConfig = sessionConfig;
190 
191         // Make JVM load the exception monitor before some transports
192         // change the thread context class loader.
193         ExceptionMonitor.getInstance();
194 
195         if (executor == null) {
196             this.executor = Executors.newCachedThreadPool();
197             createdExecutor = true;
198         } else {
199             this.executor = executor;
200             createdExecutor = false;
201         }
202 
203         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
204     }
205 
206     /**
207      * {@inheritDoc}
208      */
209     public final IoFilterChainBuilder getFilterChainBuilder() {
210         return filterChainBuilder;
211     }
212 
213     /**
214      * {@inheritDoc}
215      */
216     public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
217         if (builder == null) {
218             builder = new DefaultIoFilterChainBuilder();
219         }
220         filterChainBuilder = builder;
221     }
222 
223     /**
224      * {@inheritDoc}
225      */
226     public final DefaultIoFilterChainBuilder getFilterChain() {
227         if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
228             return (DefaultIoFilterChainBuilder) filterChainBuilder;
229         }
230         
231         
232         throw new IllegalStateException(
233                     "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     public final void addListener(IoServiceListener listener) {
240         listeners.add(listener);
241     }
242 
243     /**
244      * {@inheritDoc}
245      */
246     public final void removeListener(IoServiceListener listener) {
247         listeners.remove(listener);
248     }
249 
250     /**
251      * {@inheritDoc}
252      */
253     public final boolean isActive() {
254         return listeners.isActive();
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     public final boolean isDisposing() {
261         return disposing;
262     }
263 
264     /**
265      * {@inheritDoc}
266      */
267     public final boolean isDisposed() {
268         return disposed;
269     }
270 
271     /**
272      * {@inheritDoc}
273      */
274     public final void dispose() {
275         if (disposed) {
276             return;
277         }
278 
279         IoFuture disposalFuture;
280         synchronized (disposalLock) {
281             disposalFuture = this.disposalFuture;
282             if (!disposing) {
283                 disposing = true;
284                 try {
285                     this.disposalFuture = disposalFuture = dispose0();
286                 } catch (Exception e) {
287                     ExceptionMonitor.getInstance().exceptionCaught(e);
288                 } finally {
289                     if (disposalFuture == null) {
290                         disposed = true;
291                     }
292                 }
293             }
294         }
295         
296         if (disposalFuture != null) {
297             disposalFuture.awaitUninterruptibly();
298         }
299 
300         if (createdExecutor) {
301             ExecutorService e = (ExecutorService) executor;
302             e.shutdown();
303             while (!e.isTerminated()) {
304                 try {
305                     e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
306                 } catch (InterruptedException e1) {
307                     // Ignore; it should end shortly.
308                 }
309             }
310         }
311 
312         disposed = true;
313     }
314 
315     /**
316      * Implement this method to release any acquired resources.  This method
317      * is invoked only once by {@link #dispose()}.
318      */
319     protected abstract IoFuture dispose0() throws Exception;
320 
321     /**
322      * {@inheritDoc}
323      */
324     public final Map<Long, IoSession> getManagedSessions() {
325         return listeners.getManagedSessions();
326     }
327 
328     /**
329      * {@inheritDoc}
330      */
331     public final int getManagedSessionCount() {
332         return listeners.getManagedSessionCount();
333     }
334 
335     /**
336      * {@inheritDoc}
337      */
338     public final IoHandler getHandler() {
339         return handler;
340     }
341 
342     /**
343      * {@inheritDoc}
344      */
345     public final void setHandler(IoHandler handler) {
346         if (handler == null) {
347             throw new NullPointerException("handler cannot be null");
348         }
349 
350         if (isActive()) {
351             throw new IllegalStateException(
352                     "handler cannot be set while the service is active.");
353         }
354 
355         this.handler = handler;
356     }
357 
358     /**
359      * {@inheritDoc}
360      */
361     public IoSessionConfig getSessionConfig() {
362         return sessionConfig;
363     }
364 
365     /**
366      * {@inheritDoc}
367      */
368     public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
369         return sessionDataStructureFactory;
370     }
371 
372     /**
373      * {@inheritDoc}
374      */
375     public final void setSessionDataStructureFactory(
376             IoSessionDataStructureFactory sessionDataStructureFactory) {
377         if (sessionDataStructureFactory == null) {
378             throw new NullPointerException("sessionDataStructureFactory");
379         }
380 
381         if (isActive()) {
382             throw new IllegalStateException(
383                     "sessionDataStructureFactory cannot be set while the service is active.");
384         }
385 
386         this.sessionDataStructureFactory = sessionDataStructureFactory;
387     }
388 
389     /**
390      * {@inheritDoc}
391      */
392     public IoServiceStatistics getStatistics() {
393         return stats;
394     }
395 
396     /**
397      * {@inheritDoc}
398      */
399     public final long getActivationTime() {
400         return listeners.getActivationTime();
401     }
402 
403     /**
404      * {@inheritDoc}
405      */
406     public final Set<WriteFuture> broadcast(Object message) {
407         // Convert to Set.  We do not return a List here because only the
408         // direct caller of MessageBroadcaster knows the order of write
409         // operations.
410         final List<WriteFuture> futures = IoUtil.broadcast(message,
411                 getManagedSessions().values());
412         return new AbstractSet<WriteFuture>() {
413             @Override
414             public Iterator<WriteFuture> iterator() {
415                 return futures.iterator();
416             }
417 
418             @Override
419             public int size() {
420                 return futures.size();
421             }
422         };
423     }
424 
425     public final IoServiceListenerSupport getListeners() {
426         return listeners;
427     }
428 
429 
430     protected final void executeWorker(Runnable worker) {
431         executeWorker(worker, null);
432     }
433 
434     protected final void executeWorker(Runnable worker, String suffix) {
435         String actualThreadName = threadName;
436         if (suffix != null) {
437             actualThreadName = actualThreadName + '-' + suffix;
438         }
439         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
440     }
441 
442     // TODO Figure out make it work without causing a compiler error / warning.
443     @SuppressWarnings("unchecked")
444     protected final void initSession(IoSession session,
445             IoFuture future, IoSessionInitializer sessionInitializer) {
446         // Update lastIoTime if needed.
447         if (stats.getLastReadTime() == 0) {
448             stats.setLastReadTime(getActivationTime());
449         }
450         
451         if (stats.getLastWriteTime() == 0) {
452             stats.setLastWriteTime(getActivationTime());
453         }
454 
455         // Every property but attributeMap should be set now.
456         // Now initialize the attributeMap.  The reason why we initialize
457         // the attributeMap at last is to make sure all session properties
458         // such as remoteAddress are provided to IoSessionDataStructureFactory.
459         try {
460             ((AbstractIoSession) session).setAttributeMap(session.getService()
461                     .getSessionDataStructureFactory().getAttributeMap(session));
462         } catch (IoSessionInitializationException e) {
463             throw e;
464         } catch (Exception e) {
465             throw new IoSessionInitializationException(
466                     "Failed to initialize an attributeMap.", e);
467         }
468 
469         try {
470             ((AbstractIoSession) session).setWriteRequestQueue(session
471                     .getService().getSessionDataStructureFactory()
472                     .getWriteRequestQueue(session));
473         } catch (IoSessionInitializationException e) {
474             throw e;
475         } catch (Exception e) {
476             throw new IoSessionInitializationException(
477                     "Failed to initialize a writeRequestQueue.", e);
478         }
479 
480         if ((future != null) && (future instanceof ConnectFuture)) {
481             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
482             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE,
483                     future);
484         }
485 
486         if (sessionInitializer != null) {
487             sessionInitializer.initializeSession(session, future);
488         }
489 
490         finishSessionInitialization0(session, future);
491     }
492 
493     /**
494      * Implement this method to perform additional tasks required for session
495      * initialization. Do not call this method directly;
496      * {@link #finishSessionInitialization(IoSession, IoFuture, IoSessionInitializer)} will call
497      * this method instead.
498      */
499     protected void finishSessionInitialization0(IoSession session,
500             IoFuture future) {
501         // Do nothing. Extended class might add some specific code 
502     }
503 
504     protected static class ServiceOperationFuture extends DefaultIoFuture {
505         public ServiceOperationFuture() {
506             super(null);
507         }
508 
509         public final boolean isDone() {
510             return getValue() == Boolean.TRUE;
511         }
512 
513         public final void setDone() {
514             setValue(Boolean.TRUE);
515         }
516 
517         public final Exception getException() {
518             if (getValue() instanceof Exception) {
519                 return (Exception) getValue();
520             }
521             
522             return null;
523         }
524 
525         public final void setException(Exception exception) {
526             if (exception == null) {
527                 throw new NullPointerException("exception");
528             }
529             setValue(exception);
530         }
531     }
532 
533     /**
534      * {@inheritDoc}
535      */
536     public int getScheduledWriteBytes() {
537         return stats.getScheduledWriteBytes();
538     }
539 
540     /**
541      * {@inheritDoc}
542      */
543     public int getScheduledWriteMessages() {
544         return stats.getScheduledWriteMessages();
545     }
546     
547 }