001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.service;
021
022import java.lang.reflect.Constructor;
023import java.nio.channels.spi.SelectorProvider;
024import java.util.Arrays;
025import java.util.concurrent.Executor;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadPoolExecutor;
029
030import org.apache.mina.core.RuntimeIoException;
031import org.apache.mina.core.session.AbstractIoSession;
032import org.apache.mina.core.session.AttributeKey;
033import org.apache.mina.core.session.IoSession;
034import org.apache.mina.core.write.WriteRequest;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
040 * {@link IoProcessor}s. Most current transport implementations use this pool internally
041 * to perform better in a multi-core environment, and therefore, you won't need to 
042 * use this pool directly unless you are running multiple {@link IoService}s in the
043 * same JVM.
044 * <p>
045 * If you are running multiple {@link IoService}s, you could want to share the pool
046 * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
047 * instance by yourself and provide the pool as a constructor parameter when you
048 * create the services.
049 * <p>
050 * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
051 * It tries to instantiate the processor in the following order:
052 * <ol>
053 * <li>A public constructor with one {@link ExecutorService} parameter.</li>
054 * <li>A public constructor with one {@link Executor} parameter.</li>
055 * <li>A public default constructor</li>
056 * </ol>
057 * The following is an example for the NIO socket transport:
058 * <pre><code>
059 * // Create a shared pool.
060 * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
061 *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
062 * 
063 * // Create two services that share the same pool.
064 * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
065 * SocketConnector connector = new NioSocketConnector(pool);
066 * 
067 * ...
068 * 
069 * // Release related resources.
070 * connector.dispose();
071 * acceptor.dispose();
072 * pool.dispose();
073 * </code></pre>
074 * 
075 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
076 * 
077 * @param <S> the type of the {@link IoSession} to be managed by the specified
078 *            {@link IoProcessor}.
079 */
080public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> {
081    /** A logger for this class */
082    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
083
084    /** The default pool size, when no size is provided. */
085    private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
086
087    /** A key used to store the processor pool in the session's Attributes */
088    private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
089
090    /** The pool table */
091    private final IoProcessor<S>[] pool;
092
093    /** The contained  which is passed to the IoProcessor when they are created */
094    private final Executor executor;
095
096    /** A flag set to true if we had to create an executor */
097    private final boolean createdExecutor;
098
099    /** A lock to protect the disposal against concurrent calls */
100    private final Object disposalLock = new Object();
101
102    /** A flg set to true if the IoProcessor in the pool are being disposed */
103    private volatile boolean disposing;
104
105    /** A flag set to true if all the IoProcessor contained in the pool have been disposed */
106    private volatile boolean disposed;
107
108    /**
109     * Creates a new instance of SimpleIoProcessorPool with a default
110     * size of NbCPUs +1.
111     *
112     * @param processorType The type of IoProcessor to use
113     */
114    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
115        this(processorType, null, DEFAULT_SIZE, null);
116    }
117
118    /**
119     * Creates a new instance of SimpleIoProcessorPool with a defined
120     * number of IoProcessors in the pool
121     *
122     * @param processorType The type of IoProcessor to use
123     * @param size The number of IoProcessor in the pool
124     */
125    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
126        this(processorType, null, size, null);
127    }
128
129    /**
130     * Creates a new instance of SimpleIoProcessorPool with a defined
131     * number of IoProcessors in the pool
132     *
133     * @param processorType The type of IoProcessor to use
134     * @param size The number of IoProcessor in the pool
135     * @param selectorProvider The SelectorProvider to use
136     */
137    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size, SelectorProvider selectorProvider) {
138        this(processorType, null, size, selectorProvider);
139    }
140
141    /**
142     * Creates a new instance of SimpleIoProcessorPool with an executor
143     *
144     * @param processorType The type of IoProcessor to use
145     * @param executor The {@link Executor}
146     */
147    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
148        this(processorType, executor, DEFAULT_SIZE, null);
149    }
150
151    /**
152     * Creates a new instance of SimpleIoProcessorPool with an executor
153     *
154     * @param processorType The type of IoProcessor to use
155     * @param executor The {@link Executor}
156     * @param size The number of IoProcessor in the pool
157     * @param selectorProvider The SelectorProvider to used
158     */
159    @SuppressWarnings("unchecked")
160    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, 
161            SelectorProvider selectorProvider) {
162        if (processorType == null) {
163            throw new IllegalArgumentException("processorType");
164        }
165
166        if (size <= 0) {
167            throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
168        }
169
170        // Create the executor if none is provided
171        createdExecutor = (executor == null);
172
173        if (createdExecutor) {
174            this.executor = Executors.newCachedThreadPool();
175            // Set a default reject handler
176            ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
177        } else {
178            this.executor = executor;
179        }
180
181        pool = new IoProcessor[size];
182
183        boolean success = false;
184        Constructor<? extends IoProcessor<S>> processorConstructor = null;
185        boolean usesExecutorArg = true;
186
187        try {
188            // We create at least one processor
189            try {
190                try {
191                    processorConstructor = processorType.getConstructor(ExecutorService.class);
192                    pool[0] = processorConstructor.newInstance(this.executor);
193                } catch (NoSuchMethodException e1) {
194                    // To the next step...
195                    try {
196                        if(selectorProvider==null) {
197                            processorConstructor = processorType.getConstructor(Executor.class);
198                            pool[0] = processorConstructor.newInstance(this.executor);
199                        } else {
200                            processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
201                            pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
202                        }
203                    } catch (NoSuchMethodException e2) {
204                        // To the next step...
205                        try {
206                            processorConstructor = processorType.getConstructor();
207                            usesExecutorArg = false;
208                            pool[0] = processorConstructor.newInstance();
209                        } catch (NoSuchMethodException e3) {
210                            // To the next step...
211                        }
212                    }
213                }
214            } catch (RuntimeException re) {
215                LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
216                throw re;
217            } catch (Exception e) {
218                String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
219                LOGGER.error(msg, e);
220                throw new RuntimeIoException(msg, e);
221            }
222
223            if (processorConstructor == null) {
224                // Raise an exception if no proper constructor is found.
225                String msg = String.valueOf(processorType) + " must have a public constructor with one "
226                        + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
227                        + Executor.class.getSimpleName() + " parameter or a public default constructor.";
228                LOGGER.error(msg);
229                throw new IllegalArgumentException(msg);
230            }
231
232            // Constructor found now use it for all subsequent instantiations
233            for (int i = 1; i < pool.length; i++) {
234                try {
235                    if (usesExecutorArg) {
236                        if(selectorProvider==null) {
237                            pool[i] = processorConstructor.newInstance(this.executor);
238                        } else {
239                            pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
240                        }
241                    } else {
242                        pool[i] = processorConstructor.newInstance();
243                    }
244                } catch (Exception e) {
245                    // Won't happen because it has been done previously
246                }
247            }
248
249            success = true;
250        } finally {
251            if (!success) {
252                dispose();
253            }
254        }
255    }
256
257    /**
258     * {@inheritDoc}
259     */
260    public final void add(S session) {
261        getProcessor(session).add(session);
262    }
263
264    /**
265     * {@inheritDoc}
266     */
267    public final void flush(S session) {
268        getProcessor(session).flush(session);
269    }
270
271    /**
272     * {@inheritDoc}
273     */
274    public final void write(S session, WriteRequest writeRequest) {
275        getProcessor(session).write(session, writeRequest);
276    }
277
278    /**
279     * {@inheritDoc}
280     */
281    public final void remove(S session) {
282        getProcessor(session).remove(session);
283    }
284
285    /**
286     * {@inheritDoc}
287     */
288    public final void updateTrafficControl(S session) {
289        getProcessor(session).updateTrafficControl(session);
290    }
291
292    /**
293     * {@inheritDoc}
294     */
295    public boolean isDisposed() {
296        return disposed;
297    }
298
299    /**
300     * {@inheritDoc}
301     */
302    public boolean isDisposing() {
303        return disposing;
304    }
305
306    /**
307     * {@inheritDoc}
308     */
309    public final void dispose() {
310        if (disposed) {
311            return;
312        }
313
314        synchronized (disposalLock) {
315            if (!disposing) {
316                disposing = true;
317
318                for (IoProcessor<S> ioProcessor : pool) {
319                    if (ioProcessor == null) {
320                        // Special case if the pool has not been initialized properly
321                        continue;
322                    }
323
324                    if (ioProcessor.isDisposing()) {
325                        continue;
326                    }
327
328                    try {
329                        ioProcessor.dispose();
330                    } catch (Exception e) {
331                        LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e);
332                    }
333                }
334
335                if (createdExecutor) {
336                    ((ExecutorService) executor).shutdown();
337                }
338            }
339
340            Arrays.fill(pool, null);
341            disposed = true;
342        }
343    }
344
345    /**
346     * Find the processor associated to a session. If it hasen't be stored into
347     * the session's attributes, pick a new processor and stores it.
348     */
349    @SuppressWarnings("unchecked")
350    private IoProcessor<S> getProcessor(S session) {
351        IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
352
353        if (processor == null) {
354            if (disposed || disposing) {
355                throw new IllegalStateException("A disposed processor cannot be accessed.");
356            }
357
358            processor = pool[Math.abs((int) session.getId()) % pool.length];
359
360            if (processor == null) {
361                throw new IllegalStateException("A disposed processor cannot be accessed.");
362            }
363
364            session.setAttributeIfAbsent(PROCESSOR, processor);
365        }
366
367        return processor;
368    }
369}