View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.apache.mina.core.RuntimeIoException;
28  import org.apache.mina.core.session.AbstractIoSession;
29  import org.apache.mina.core.session.AttributeKey;
30  import org.apache.mina.core.session.IoSession;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  /**
35   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
36   * {@link IoProcessor}s. Most current transport implementations use this pool internally
37   * to perform better in a multi-core environment, and therefore, you won't need to 
38   * use this pool directly unless you are running multiple {@link IoService}s in the
39   * same JVM.
40   * <p>
41   * If you are running multiple {@link IoService}s, you could want to share the pool
42   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
43   * instance by yourself and provide the pool as a constructor parameter when you
44   * create the services.
45   * <p>
46   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
47   * It tries to instantiate the processor in the following order:
48   * <ol>
49   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
50   * <li>A public constructor with one {@link Executor} parameter.</li>
51   * <li>A public default constructor</li>
52   * </ol>
53   * The following is an example for the NIO socket transport:
54   * <pre><code>
55   * // Create a shared pool.
56   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
57   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
58   * 
59   * // Create two services that share the same pool.
60   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
61   * SocketConnector connector = new NioSocketConnector(pool);
62   * 
63   * ...
64   * 
65   * // Release related resources.
66   * connector.dispose();
67   * acceptor.dispose();
68   * pool.dispose();
69   * </code></pre>
70   * 
71   * @author The Apache MINA Project (dev@mina.apache.org)
72   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
73   * 
74   * @param <T> the type of the {@link IoSession} to be managed by the specified
75   *            {@link IoProcessor}.
76   */
77  public class SimpleIoProcessorPool<T extends AbstractIoSession> implements IoProcessor<T> {
78      
79      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
80      private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
81      
82      private final Logger logger = LoggerFactory.getLogger(getClass());
83  
84      private final IoProcessor<T>[] pool;
85      private final AtomicInteger processorDistributor = new AtomicInteger();
86      private final Executor executor;
87      private final boolean createdExecutor;
88      
89      private final Object disposalLock = new Object();
90      private volatile boolean disposing;
91      private volatile boolean disposed;
92      
93      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
94          this(processorType, null, DEFAULT_SIZE);
95      }
96      
97      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, int size) {
98          this(processorType, null, size);
99      }
100 
101     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor) {
102         this(processorType, executor, DEFAULT_SIZE);
103     }
104     
105     @SuppressWarnings("unchecked")
106     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor, int size) {
107         if (processorType == null) {
108             throw new NullPointerException("processorType");
109         }
110         if (size <= 0) {
111             throw new IllegalArgumentException(
112                     "size: " + size + " (expected: positive integer)");
113         }
114         
115         if (executor == null) {
116             this.executor = executor = Executors.newCachedThreadPool();
117             this.createdExecutor = true;
118         } else {
119             this.executor = executor;
120             this.createdExecutor = false;
121         }
122         
123         pool = new IoProcessor[size];
124         
125         boolean success = false;
126         try {
127             for (int i = 0; i < pool.length; i ++) {
128                 IoProcessor<T> processor = null;
129                 
130                 // Try to create a new processor with a proper constructor.
131                 try {
132                     try {
133                         processor = processorType.getConstructor(ExecutorService.class).newInstance(executor);
134                     } catch (NoSuchMethodException e) {
135                         // To the next step...
136                     }
137                     
138                     if (processor == null) {
139                         try {
140                             processor = processorType.getConstructor(Executor.class).newInstance(executor);
141                         } catch (NoSuchMethodException e) {
142                             // To the next step...
143                         }
144                     }
145                     
146                     if (processor == null) {
147                         try {
148                             processor = processorType.getConstructor().newInstance();
149                         } catch (NoSuchMethodException e) {
150                             // To the next step...
151                         }
152                     }
153                 } catch (RuntimeException e) {
154                     throw e;
155                 } catch (Exception e) {
156                     throw new RuntimeIoException(
157                             "Failed to create a new instance of " + processorType.getName(), e);
158                 }
159                 
160                 // Raise an exception if no proper constructor is found.
161                 if (processor == null) {
162                     throw new IllegalArgumentException(
163                             String.valueOf(processorType) + " must have a public constructor " +
164                             "with one " + ExecutorService.class.getSimpleName() + " parameter, " +
165                             "a public constructor with one " + Executor.class.getSimpleName() + 
166                             " parameter or a public default constructor.");
167                 }
168                 
169                 pool[i] = processor;
170             }
171             
172             success = true;
173         } finally {
174             if (!success) {
175                 dispose();
176             }
177         }
178     }
179     
180     public final void add(T session) {
181         getProcessor(session).add(session);
182     }
183 
184     public final void flush(T session) {
185         getProcessor(session).flush(session);
186     }
187 
188     public final void remove(T session) {
189         getProcessor(session).remove(session);
190     }
191 
192     public final void updateTrafficMask(T session) {
193         getProcessor(session).updateTrafficMask(session);
194     }
195     
196     public boolean isDisposed() {
197         return disposed;
198     }
199 
200     public boolean isDisposing() {
201         return disposing;
202     }
203 
204     public final void dispose() {
205         if (disposed) {
206             return;
207         }
208 
209         synchronized (disposalLock) {
210             if (!disposing) {
211                 disposing = true;
212                 for (int i = pool.length - 1; i >= 0; i --) {
213                     if (pool[i] == null || pool[i].isDisposing()) {
214                         continue;
215                     }
216 
217                     try {
218                         pool[i].dispose();
219                     } catch (Exception e) {
220                         logger.warn(
221                                 "Failed to dispose a " +
222                                 pool[i].getClass().getSimpleName() +
223                                 " at index " + i + ".", e);
224                     } finally {
225                         pool[i] = null;
226                     }
227                 }
228                 
229                 if (createdExecutor) {
230                     ((ExecutorService) executor).shutdown();
231                 }
232             }
233         }
234 
235         disposed = true;
236     }
237     
238     @SuppressWarnings("unchecked")
239     private IoProcessor<T> getProcessor(T session) {
240         IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
241         if (p == null) {
242             p = nextProcessor();
243             IoProcessor<T> oldp =
244                 (IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
245             if (oldp != null) {
246                 p = oldp;
247             }
248         }
249         
250         return p;
251     }
252 
253     private IoProcessor<T> nextProcessor() {
254         checkDisposal();
255         return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
256     }
257 
258     private void checkDisposal() {
259         if (disposed) {
260             throw new IllegalStateException("A disposed processor cannot be accessed.");
261         }
262     }
263 }