001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.service;
021
022import java.util.AbstractSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.Executor;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.mina.core.IoUtil;
034import org.apache.mina.core.filterchain.DefaultIoFilterChain;
035import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
036import org.apache.mina.core.filterchain.IoFilterChainBuilder;
037import org.apache.mina.core.future.ConnectFuture;
038import org.apache.mina.core.future.DefaultIoFuture;
039import org.apache.mina.core.future.IoFuture;
040import org.apache.mina.core.future.WriteFuture;
041import org.apache.mina.core.session.AbstractIoSession;
042import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
043import org.apache.mina.core.session.IdleStatus;
044import org.apache.mina.core.session.IoSession;
045import org.apache.mina.core.session.IoSessionConfig;
046import org.apache.mina.core.session.IoSessionDataStructureFactory;
047import org.apache.mina.core.session.IoSessionInitializationException;
048import org.apache.mina.core.session.IoSessionInitializer;
049import org.apache.mina.util.ExceptionMonitor;
050import org.apache.mina.util.NamePreservingRunnable;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Base implementation of {@link IoService}s.
056 * 
057 * An instance of IoService contains an Executor which will handle the incoming
058 * events.
059 *
060 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
061 */
062public abstract class AbstractIoService implements IoService {
063
064    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
065
066    /**
067     * The unique number identifying the Service. It's incremented
068     * for each new IoService created.
069     */
070    private static final AtomicInteger id = new AtomicInteger();
071
072    /**
073     * The thread name built from the IoService inherited
074     * instance class name and the IoService Id
075     **/
076    private final String threadName;
077
078    /**
079     * The associated executor, responsible for handling execution of I/O events.
080     */
081    private final Executor executor;
082
083    /**
084     * A flag used to indicate that the local executor has been created
085     * inside this instance, and not passed by a caller.
086     * 
087     * If the executor is locally created, then it will be an instance
088     * of the ThreadPoolExecutor class.
089     */
090    private final boolean createdExecutor;
091
092    /**
093     * The IoHandler in charge of managing all the I/O Events. It is
094     */
095    private IoHandler handler;
096
097    /**
098     * The default {@link IoSessionConfig} which will be used to configure new sessions.
099     */
100    protected final IoSessionConfig sessionConfig;
101
102    private final IoServiceListener serviceActivationListener = new IoServiceListener() {
103        public void serviceActivated(IoService service) {
104            // Update lastIoTime.
105            AbstractIoService s = (AbstractIoService) service;
106            IoServiceStatistics _stats = s.getStatistics();
107            _stats.setLastReadTime(s.getActivationTime());
108            _stats.setLastWriteTime(s.getActivationTime());
109            _stats.setLastThroughputCalculationTime(s.getActivationTime());
110
111        }
112
113        public void serviceDeactivated(IoService service) throws Exception {
114            // Empty handler
115        }
116
117        public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
118            // Empty handler
119        }
120
121        public void sessionCreated(IoSession session) throws Exception {
122            // Empty handler
123        }
124
125        public void sessionClosed(IoSession session) throws Exception {
126            // Empty handler
127        }
128
129        public void sessionDestroyed(IoSession session) throws Exception {
130            // Empty handler
131        }
132    };
133
134    /**
135     * Current filter chain builder.
136     */
137    private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
138
139    private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
140
141    /**
142     * Maintains the {@link IoServiceListener}s of this service.
143     */
144    private final IoServiceListenerSupport listeners;
145
146    /**
147     * A lock object which must be acquired when related resources are
148     * destroyed.
149     */
150    protected final Object disposalLock = new Object();
151
152    private volatile boolean disposing;
153
154    private volatile boolean disposed;
155
156    /**
157     * {@inheritDoc}
158     */
159    private IoServiceStatistics stats = new IoServiceStatistics(this);
160
161    /**
162     * Constructor for {@link AbstractIoService}. You need to provide a default
163     * session configuration and an {@link Executor} for handling I/O events. If
164     * a null {@link Executor} is provided, a default one will be created using
165     * {@link Executors#newCachedThreadPool()}.
166     * 
167     * @param sessionConfig
168     *            the default configuration for the managed {@link IoSession}
169     * @param executor
170     *            the {@link Executor} used for handling execution of I/O
171     *            events. Can be <code>null</code>.
172     */
173    protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
174        if (sessionConfig == null) {
175            throw new IllegalArgumentException("sessionConfig");
176        }
177
178        if (getTransportMetadata() == null) {
179            throw new IllegalArgumentException("TransportMetadata");
180        }
181
182        if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
183            throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
184                    + getTransportMetadata().getSessionConfigType() + ")");
185        }
186
187        // Create the listeners, and add a first listener : a activation listener
188        // for this service, which will give information on the service state.
189        listeners = new IoServiceListenerSupport(this);
190        listeners.add(serviceActivationListener);
191
192        // Stores the given session configuration
193        this.sessionConfig = sessionConfig;
194
195        // Make JVM load the exception monitor before some transports
196        // change the thread context class loader.
197        ExceptionMonitor.getInstance();
198
199        if (executor == null) {
200            this.executor = Executors.newCachedThreadPool();
201            createdExecutor = true;
202        } else {
203            this.executor = executor;
204            createdExecutor = false;
205        }
206
207        threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
208    }
209
210    /**
211     * {@inheritDoc}
212     */
213    public final IoFilterChainBuilder getFilterChainBuilder() {
214        return filterChainBuilder;
215    }
216
217    /**
218     * {@inheritDoc}
219     */
220    public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
221        if (builder == null) {
222            builder = new DefaultIoFilterChainBuilder();
223        }
224        filterChainBuilder = builder;
225    }
226
227    /**
228     * {@inheritDoc}
229     */
230    public final DefaultIoFilterChainBuilder getFilterChain() {
231        if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
232            return (DefaultIoFilterChainBuilder) filterChainBuilder;
233        }
234
235        throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
236    }
237
238    /**
239     * {@inheritDoc}
240     */
241    public final void addListener(IoServiceListener listener) {
242        listeners.add(listener);
243    }
244
245    /**
246     * {@inheritDoc}
247     */
248    public final void removeListener(IoServiceListener listener) {
249        listeners.remove(listener);
250    }
251
252    /**
253     * {@inheritDoc}
254     */
255    public final boolean isActive() {
256        return listeners.isActive();
257    }
258
259    /**
260     * {@inheritDoc}
261     */
262    public final boolean isDisposing() {
263        return disposing;
264    }
265
266    /**
267     * {@inheritDoc}
268     */
269    public final boolean isDisposed() {
270        return disposed;
271    }
272
273    /**
274     * {@inheritDoc}
275     */
276    public final void dispose() {
277        dispose(false);
278    }
279
280    /**
281     * {@inheritDoc}
282     */
283    public final void dispose(boolean awaitTermination) {
284        if (disposed) {
285            return;
286        }
287
288        synchronized (disposalLock) {
289            if (!disposing) {
290                disposing = true;
291
292                try {
293                    dispose0();
294                } catch (Exception e) {
295                    ExceptionMonitor.getInstance().exceptionCaught(e);
296                }
297            }
298        }
299
300        if (createdExecutor) {
301            ExecutorService e = (ExecutorService) executor;
302            e.shutdownNow();
303            if (awaitTermination) {
304
305                try {
306                    LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
307                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
308                    LOGGER.debug("awaitTermination on {} finished", this);
309                } catch (InterruptedException e1) {
310                    LOGGER.warn("awaitTermination on [{}] was interrupted", this);
311                    // Restore the interrupted status
312                    Thread.currentThread().interrupt();
313                }
314            }
315        }
316        disposed = true;
317    }
318
319    /**
320     * Implement this method to release any acquired resources.  This method
321     * is invoked only once by {@link #dispose()}.
322     * 
323     * @throws Exception If the dispose failed
324     */
325    protected abstract void dispose0() throws Exception;
326
327    /**
328     * {@inheritDoc}
329     */
330    public final Map<Long, IoSession> getManagedSessions() {
331        return listeners.getManagedSessions();
332    }
333
334    /**
335     * {@inheritDoc}
336     */
337    public final int getManagedSessionCount() {
338        return listeners.getManagedSessionCount();
339    }
340
341    /**
342     * {@inheritDoc}
343     */
344    public final IoHandler getHandler() {
345        return handler;
346    }
347
348    /**
349     * {@inheritDoc}
350     */
351    public final void setHandler(IoHandler handler) {
352        if (handler == null) {
353            throw new IllegalArgumentException("handler cannot be null");
354        }
355
356        if (isActive()) {
357            throw new IllegalStateException("handler cannot be set while the service is active.");
358        }
359
360        this.handler = handler;
361    }
362
363    /**
364     * {@inheritDoc}
365     */
366    public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
367        return sessionDataStructureFactory;
368    }
369
370    /**
371     * {@inheritDoc}
372     */
373    public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
374        if (sessionDataStructureFactory == null) {
375            throw new IllegalArgumentException("sessionDataStructureFactory");
376        }
377
378        if (isActive()) {
379            throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
380        }
381
382        this.sessionDataStructureFactory = sessionDataStructureFactory;
383    }
384
385    /**
386     * {@inheritDoc}
387     */
388    public IoServiceStatistics getStatistics() {
389        return stats;
390    }
391
392    /**
393     * {@inheritDoc}
394     */
395    public final long getActivationTime() {
396        return listeners.getActivationTime();
397    }
398
399    /**
400     * {@inheritDoc}
401     */
402    public final Set<WriteFuture> broadcast(Object message) {
403        // Convert to Set.  We do not return a List here because only the
404        // direct caller of MessageBroadcaster knows the order of write
405        // operations.
406        final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
407        return new AbstractSet<WriteFuture>() {
408            @Override
409            public Iterator<WriteFuture> iterator() {
410                return futures.iterator();
411            }
412
413            @Override
414            public int size() {
415                return futures.size();
416            }
417        };
418    }
419
420    public final IoServiceListenerSupport getListeners() {
421        return listeners;
422    }
423
424    protected final void executeWorker(Runnable worker) {
425        executeWorker(worker, null);
426    }
427
428    protected final void executeWorker(Runnable worker, String suffix) {
429        String actualThreadName = threadName;
430        if (suffix != null) {
431            actualThreadName = actualThreadName + '-' + suffix;
432        }
433        executor.execute(new NamePreservingRunnable(worker, actualThreadName));
434    }
435
436    // TODO Figure out make it work without causing a compiler error / warning.
437    @SuppressWarnings("unchecked")
438    protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
439        // Update lastIoTime if needed.
440        if (stats.getLastReadTime() == 0) {
441            stats.setLastReadTime(getActivationTime());
442        }
443
444        if (stats.getLastWriteTime() == 0) {
445            stats.setLastWriteTime(getActivationTime());
446        }
447
448        // Every property but attributeMap should be set now.
449        // Now initialize the attributeMap.  The reason why we initialize
450        // the attributeMap at last is to make sure all session properties
451        // such as remoteAddress are provided to IoSessionDataStructureFactory.
452        try {
453            ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
454                    .getAttributeMap(session));
455        } catch (IoSessionInitializationException e) {
456            throw e;
457        } catch (Exception e) {
458            throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
459        }
460
461        try {
462            ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
463                    .getWriteRequestQueue(session));
464        } catch (IoSessionInitializationException e) {
465            throw e;
466        } catch (Exception e) {
467            throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
468        }
469
470        if ((future != null) && (future instanceof ConnectFuture)) {
471            // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
472            session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
473        }
474
475        if (sessionInitializer != null) {
476            sessionInitializer.initializeSession(session, future);
477        }
478
479        finishSessionInitialization0(session, future);
480    }
481
482    /**
483     * Implement this method to perform additional tasks required for session
484     * initialization. Do not call this method directly;
485     * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
486     * this method instead.
487     * 
488     * @param session The session to initialize
489     * @param future The Future to use
490     * 
491     */
492    protected void finishSessionInitialization0(IoSession session, IoFuture future) {
493        // Do nothing. Extended class might add some specific code
494    }
495
496    /**
497     * A specific class used to 
498     * @author elecharny
499     *
500     */
501    protected static class ServiceOperationFuture extends DefaultIoFuture {
502        public ServiceOperationFuture() {
503            super(null);
504        }
505
506        public final boolean isDone() {
507            return getValue() == Boolean.TRUE;
508        }
509
510        public final void setDone() {
511            setValue(Boolean.TRUE);
512        }
513
514        public final Exception getException() {
515            if (getValue() instanceof Exception) {
516                return (Exception) getValue();
517            }
518
519            return null;
520        }
521
522        public final void setException(Exception exception) {
523            if (exception == null) {
524                throw new IllegalArgumentException("exception");
525            }
526            
527            setValue(exception);
528        }
529    }
530
531    /**
532     * {@inheritDoc}
533     */
534    public int getScheduledWriteBytes() {
535        return stats.getScheduledWriteBytes();
536    }
537
538    /**
539     * {@inheritDoc}
540     */
541    public int getScheduledWriteMessages() {
542        return stats.getScheduledWriteMessages();
543    }
544
545}