View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.apache.mina.core.RuntimeIoException;
28  import org.apache.mina.core.session.AbstractIoSession;
29  import org.apache.mina.core.session.AttributeKey;
30  import org.apache.mina.core.session.IoSession;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  /**
35   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
36   * {@link IoProcessor}s. Most current transport implementations use this pool internally
37   * to perform better in a multi-core environment, and therefore, you won't need to 
38   * use this pool directly unless you are running multiple {@link IoService}s in the
39   * same JVM.
40   * <p>
41   * If you are running multiple {@link IoService}s, you could want to share the pool
42   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
43   * instance by yourself and provide the pool as a constructor parameter when you
44   * create the services.
45   * <p>
46   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
47   * It tries to instantiate the processor in the following order:
48   * <ol>
49   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
50   * <li>A public constructor with one {@link Executor} parameter.</li>
51   * <li>A public default constructor</li>
52   * </ol>
53   * The following is an example for the NIO socket transport:
54   * <pre><code>
55   * // Create a shared pool.
56   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
57   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
58   * 
59   * // Create two services that share the same pool.
60   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
61   * SocketConnector connector = new NioSocketConnector(pool);
62   * 
63   * ...
64   * 
65   * // Release related resources.
66   * connector.dispose();
67   * acceptor.dispose();
68   * pool.dispose();
69   * </code></pre>
70   * 
71   * @author The Apache MINA Project (dev@mina.apache.org)
72   * 
73   * @param <T> the type of the {@link IoSession} to be managed by the specified
74   *            {@link IoProcessor}.
75   */
76  public class SimpleIoProcessorPool<T extends AbstractIoSession> implements IoProcessor<T> {
77      
78      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
79      private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
80      
81      private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
82  
83      private final IoProcessor<T>[] pool;
84      private final AtomicInteger processorDistributor = new AtomicInteger();
85      private final Executor executor;
86      private final boolean createdExecutor;
87      
88      private final Object disposalLock = new Object();
89      private volatile boolean disposing;
90      private volatile boolean disposed;
91      
92      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
93          this(processorType, null, DEFAULT_SIZE);
94      }
95      
96      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, int size) {
97          this(processorType, null, size);
98      }
99  
100     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor) {
101         this(processorType, executor, DEFAULT_SIZE);
102     }
103     
104     @SuppressWarnings("unchecked")
105     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor, int size) {
106         if (processorType == null) {
107             throw new NullPointerException("processorType");
108         }
109         if (size <= 0) {
110             throw new IllegalArgumentException(
111                     "size: " + size + " (expected: positive integer)");
112         }
113         
114         if (executor == null) {
115             this.executor = executor = Executors.newCachedThreadPool();
116             this.createdExecutor = true;
117         } else {
118             this.executor = executor;
119             this.createdExecutor = false;
120         }
121         
122         pool = new IoProcessor[size];
123         
124         boolean success = false;
125         try {
126             for (int i = 0; i < pool.length; i ++) {
127                 IoProcessor<T> processor = null;
128                 
129                 // Try to create a new processor with a proper constructor.
130                 try {
131                     try {
132                         processor = processorType.getConstructor(ExecutorService.class).newInstance(executor);
133                     } catch (NoSuchMethodException e) {
134                         // To the next step...
135                     }
136                     
137                     if (processor == null) {
138                         try {
139                             processor = processorType.getConstructor(Executor.class).newInstance(executor);
140                         } catch (NoSuchMethodException e) {
141                             // To the next step...
142                         }
143                     }
144                     
145                     if (processor == null) {
146                         try {
147                             processor = processorType.getConstructor().newInstance();
148                         } catch (NoSuchMethodException e) {
149                             // To the next step...
150                         }
151                     }
152                 } catch (RuntimeException e) {
153                     throw e;
154                 } catch (Exception e) {
155                     throw new RuntimeIoException(
156                             "Failed to create a new instance of " + processorType.getName(), e);
157                 }
158                 
159                 // Raise an exception if no proper constructor is found.
160                 if (processor == null) {
161                     throw new IllegalArgumentException(
162                             String.valueOf(processorType) + " must have a public constructor " +
163                             "with one " + ExecutorService.class.getSimpleName() + " parameter, " +
164                             "a public constructor with one " + Executor.class.getSimpleName() + 
165                             " parameter or a public default constructor.");
166                 }
167                 
168                 pool[i] = processor;
169             }
170             
171             success = true;
172         } finally {
173             if (!success) {
174                 dispose();
175             }
176         }
177     }
178     
179     public final void add(T session) {
180         getProcessor(session).add(session);
181     }
182 
183     public final void flush(T session) {
184         getProcessor(session).flush(session);
185     }
186 
187     public final void remove(T session) {
188         getProcessor(session).remove(session);
189     }
190 
191     public final void updateTrafficControl(T session) {
192         getProcessor(session).updateTrafficControl(session);
193     }
194     
195     public boolean isDisposed() {
196         return disposed;
197     }
198 
199     public boolean isDisposing() {
200         return disposing;
201     }
202 
203     public final void dispose() {
204         if (disposed) {
205             return;
206         }
207 
208         synchronized (disposalLock) {
209             if (!disposing) {
210                 disposing = true;
211                 for (int i = pool.length - 1; i >= 0; i --) {
212                     if (pool[i] == null || pool[i].isDisposing()) {
213                         continue;
214                     }
215 
216                     try {
217                         pool[i].dispose();
218                     } catch (Exception e) {
219                         LOGGER.warn(
220                                 "Failed to dispose a " +
221                                 pool[i].getClass().getSimpleName() +
222                                 " at index " + i + ".", e);
223                     } finally {
224                         pool[i] = null;
225                     }
226                 }
227                 
228                 if (createdExecutor) {
229                     ((ExecutorService) executor).shutdown();
230                 }
231             }
232         }
233 
234         disposed = true;
235     }
236     
237     @SuppressWarnings("unchecked")
238     private IoProcessor<T> getProcessor(T session) {
239         IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
240         if (p == null) {
241             p = nextProcessor();
242             IoProcessor<T> oldp =
243                 (IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
244             if (oldp != null) {
245                 p = oldp;
246             }
247         }
248         
249         return p;
250     }
251 
252     private IoProcessor<T> nextProcessor() {
253         checkDisposal();
254         return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
255     }
256 
257     private void checkDisposal() {
258         if (disposed) {
259             throw new IllegalStateException("A disposed processor cannot be accessed.");
260         }
261     }
262 }