001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.service; 021 022import java.lang.reflect.Constructor; 023import java.nio.channels.spi.SelectorProvider; 024import java.util.Arrays; 025import java.util.concurrent.Executor; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ThreadPoolExecutor; 029 030import org.apache.mina.core.RuntimeIoException; 031import org.apache.mina.core.session.AbstractIoSession; 032import org.apache.mina.core.session.AttributeKey; 033import org.apache.mina.core.session.IoSession; 034import org.apache.mina.core.write.WriteRequest; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more 040 * {@link IoProcessor}s. Most current transport implementations use this pool internally 041 * to perform better in a multi-core environment, and therefore, you won't need to 042 * use this pool directly unless you are running multiple {@link IoService}s in the 043 * same JVM. 044 * <p> 045 * If you are running multiple {@link IoService}s, you could want to share the pool 046 * among all services. To do so, you can create a new {@link SimpleIoProcessorPool} 047 * instance by yourself and provide the pool as a constructor parameter when you 048 * create the services. 049 * <p> 050 * This pool uses Java reflection API to create multiple {@link IoProcessor} instances. 051 * It tries to instantiate the processor in the following order: 052 * <ol> 053 * <li>A public constructor with one {@link ExecutorService} parameter.</li> 054 * <li>A public constructor with one {@link Executor} parameter.</li> 055 * <li>A public default constructor</li> 056 * </ol> 057 * The following is an example for the NIO socket transport: 058 * <pre><code> 059 * // Create a shared pool. 060 * SimpleIoProcessorPool<NioSession> pool = 061 * new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16); 062 * 063 * // Create two services that share the same pool. 064 * SocketAcceptor acceptor = new NioSocketAcceptor(pool); 065 * SocketConnector connector = new NioSocketConnector(pool); 066 * 067 * ... 068 * 069 * // Release related resources. 070 * connector.dispose(); 071 * acceptor.dispose(); 072 * pool.dispose(); 073 * </code></pre> 074 * 075 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 076 * 077 * @param <S> the type of the {@link IoSession} to be managed by the specified 078 * {@link IoProcessor}. 079 */ 080public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> { 081 /** A logger for this class */ 082 private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class); 083 084 /** The default pool size, when no size is provided. */ 085 private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; 086 087 /** A key used to store the processor pool in the session's Attributes */ 088 private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor"); 089 090 /** The pool table */ 091 private final IoProcessor<S>[] pool; 092 093 /** The contained which is passed to the IoProcessor when they are created */ 094 private final Executor executor; 095 096 /** A flag set to true if we had to create an executor */ 097 private final boolean createdExecutor; 098 099 /** A lock to protect the disposal against concurrent calls */ 100 private final Object disposalLock = new Object(); 101 102 /** A flg set to true if the IoProcessor in the pool are being disposed */ 103 private volatile boolean disposing; 104 105 /** A flag set to true if all the IoProcessor contained in the pool have been disposed */ 106 private volatile boolean disposed; 107 108 /** 109 * Creates a new instance of SimpleIoProcessorPool with a default 110 * size of NbCPUs +1. 111 * 112 * @param processorType The type of IoProcessor to use 113 */ 114 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) { 115 this(processorType, null, DEFAULT_SIZE, null); 116 } 117 118 /** 119 * Creates a new instance of SimpleIoProcessorPool with a defined 120 * number of IoProcessors in the pool 121 * 122 * @param processorType The type of IoProcessor to use 123 * @param size The number of IoProcessor in the pool 124 */ 125 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) { 126 this(processorType, null, size, null); 127 } 128 129 /** 130 * Creates a new instance of SimpleIoProcessorPool with a defined 131 * number of IoProcessors in the pool 132 * 133 * @param processorType The type of IoProcessor to use 134 * @param size The number of IoProcessor in the pool 135 * @param selectorProvider The SelectorProvider to use 136 */ 137 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size, SelectorProvider selectorProvider) { 138 this(processorType, null, size, selectorProvider); 139 } 140 141 /** 142 * Creates a new instance of SimpleIoProcessorPool with an executor 143 * 144 * @param processorType The type of IoProcessor to use 145 * @param executor The {@link Executor} 146 */ 147 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) { 148 this(processorType, executor, DEFAULT_SIZE, null); 149 } 150 151 /** 152 * Creates a new instance of SimpleIoProcessorPool with an executor 153 * 154 * @param processorType The type of IoProcessor to use 155 * @param executor The {@link Executor} 156 * @param size The number of IoProcessor in the pool 157 * @param selectorProvider The SelectorProvider to used 158 */ 159 @SuppressWarnings("unchecked") 160 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, 161 SelectorProvider selectorProvider) { 162 if (processorType == null) { 163 throw new IllegalArgumentException("processorType"); 164 } 165 166 if (size <= 0) { 167 throw new IllegalArgumentException("size: " + size + " (expected: positive integer)"); 168 } 169 170 // Create the executor if none is provided 171 createdExecutor = (executor == null); 172 173 if (createdExecutor) { 174 this.executor = Executors.newCachedThreadPool(); 175 // Set a default reject handler 176 ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 177 } else { 178 this.executor = executor; 179 } 180 181 pool = new IoProcessor[size]; 182 183 boolean success = false; 184 Constructor<? extends IoProcessor<S>> processorConstructor = null; 185 boolean usesExecutorArg = true; 186 187 try { 188 // We create at least one processor 189 try { 190 try { 191 processorConstructor = processorType.getConstructor(ExecutorService.class); 192 pool[0] = processorConstructor.newInstance(this.executor); 193 } catch (NoSuchMethodException e1) { 194 // To the next step... 195 try { 196 if(selectorProvider==null) { 197 processorConstructor = processorType.getConstructor(Executor.class); 198 pool[0] = processorConstructor.newInstance(this.executor); 199 } else { 200 processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class); 201 pool[0] = processorConstructor.newInstance(this.executor,selectorProvider); 202 } 203 } catch (NoSuchMethodException e2) { 204 // To the next step... 205 try { 206 processorConstructor = processorType.getConstructor(); 207 usesExecutorArg = false; 208 pool[0] = processorConstructor.newInstance(); 209 } catch (NoSuchMethodException e3) { 210 // To the next step... 211 } 212 } 213 } 214 } catch (RuntimeException re) { 215 LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage()); 216 throw re; 217 } catch (Exception e) { 218 String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage(); 219 LOGGER.error(msg, e); 220 throw new RuntimeIoException(msg, e); 221 } 222 223 if (processorConstructor == null) { 224 // Raise an exception if no proper constructor is found. 225 String msg = String.valueOf(processorType) + " must have a public constructor with one " 226 + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one " 227 + Executor.class.getSimpleName() + " parameter or a public default constructor."; 228 LOGGER.error(msg); 229 throw new IllegalArgumentException(msg); 230 } 231 232 // Constructor found now use it for all subsequent instantiations 233 for (int i = 1; i < pool.length; i++) { 234 try { 235 if (usesExecutorArg) { 236 if(selectorProvider==null) { 237 pool[i] = processorConstructor.newInstance(this.executor); 238 } else { 239 pool[i] = processorConstructor.newInstance(this.executor, selectorProvider); 240 } 241 } else { 242 pool[i] = processorConstructor.newInstance(); 243 } 244 } catch (Exception e) { 245 // Won't happen because it has been done previously 246 } 247 } 248 249 success = true; 250 } finally { 251 if (!success) { 252 dispose(); 253 } 254 } 255 } 256 257 /** 258 * {@inheritDoc} 259 */ 260 public final void add(S session) { 261 getProcessor(session).add(session); 262 } 263 264 /** 265 * {@inheritDoc} 266 */ 267 public final void flush(S session) { 268 getProcessor(session).flush(session); 269 } 270 271 /** 272 * {@inheritDoc} 273 */ 274 public final void write(S session, WriteRequest writeRequest) { 275 getProcessor(session).write(session, writeRequest); 276 } 277 278 /** 279 * {@inheritDoc} 280 */ 281 public final void remove(S session) { 282 getProcessor(session).remove(session); 283 } 284 285 /** 286 * {@inheritDoc} 287 */ 288 public final void updateTrafficControl(S session) { 289 getProcessor(session).updateTrafficControl(session); 290 } 291 292 /** 293 * {@inheritDoc} 294 */ 295 public boolean isDisposed() { 296 return disposed; 297 } 298 299 /** 300 * {@inheritDoc} 301 */ 302 public boolean isDisposing() { 303 return disposing; 304 } 305 306 /** 307 * {@inheritDoc} 308 */ 309 public final void dispose() { 310 if (disposed) { 311 return; 312 } 313 314 synchronized (disposalLock) { 315 if (!disposing) { 316 disposing = true; 317 318 for (IoProcessor<S> ioProcessor : pool) { 319 if (ioProcessor == null) { 320 // Special case if the pool has not been initialized properly 321 continue; 322 } 323 324 if (ioProcessor.isDisposing()) { 325 continue; 326 } 327 328 try { 329 ioProcessor.dispose(); 330 } catch (Exception e) { 331 LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e); 332 } 333 } 334 335 if (createdExecutor) { 336 ((ExecutorService) executor).shutdown(); 337 } 338 } 339 340 Arrays.fill(pool, null); 341 disposed = true; 342 } 343 } 344 345 /** 346 * Find the processor associated to a session. If it hasen't be stored into 347 * the session's attributes, pick a new processor and stores it. 348 */ 349 @SuppressWarnings("unchecked") 350 private IoProcessor<S> getProcessor(S session) { 351 IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); 352 353 if (processor == null) { 354 if (disposed || disposing) { 355 throw new IllegalStateException("A disposed processor cannot be accessed."); 356 } 357 358 processor = pool[Math.abs((int) session.getId()) % pool.length]; 359 360 if (processor == null) { 361 throw new IllegalStateException("A disposed processor cannot be accessed."); 362 } 363 364 session.setAttributeIfAbsent(PROCESSOR, processor); 365 } 366 367 return processor; 368 } 369}