1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.IoUtil;
34 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37 import org.apache.mina.core.future.ConnectFuture;
38 import org.apache.mina.core.future.DefaultIoFuture;
39 import org.apache.mina.core.future.IoFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43 import org.apache.mina.core.session.IdleStatus;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.IoSessionDataStructureFactory;
47 import org.apache.mina.core.session.IoSessionInitializationException;
48 import org.apache.mina.core.session.IoSessionInitializer;
49 import org.apache.mina.util.ExceptionMonitor;
50 import org.apache.mina.util.NamePreservingRunnable;
51
52
53
54
55
56
57
58
59
60 public abstract class AbstractIoService implements IoService {
61
62
63
64
65 private static final AtomicInteger id = new AtomicInteger();
66
67
68
69
70
71 private final String threadName;
72
73
74
75
76 private final Executor executor;
77
78
79
80
81
82
83
84
85 private final boolean createdExecutor;
86
87
88
89
90 private IoHandler handler;
91
92
93
94
95 private final IoSessionConfig sessionConfig;
96
97 private final IoServiceListener serviceActivationListener = new IoServiceListener() {
98 public void serviceActivated(IoService service) {
99
100 AbstractIoService s = (AbstractIoService) service;
101 IoServiceStatistics _stats = s.getStatistics();
102 _stats.setLastReadTime(s.getActivationTime());
103 _stats.setLastWriteTime(s.getActivationTime());
104 _stats.setLastThroughputCalculationTime(s.getActivationTime());
105
106 }
107
108 public void serviceDeactivated(IoService service) {
109
110 }
111
112 public void serviceIdle(IoService service, IdleStatus idleStatus) {
113
114 }
115
116 public void sessionCreated(IoSession session) {
117
118 }
119
120 public void sessionDestroyed(IoSession session) {
121
122 }
123 };
124
125
126
127
128 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
129
130 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
131
132
133
134
135 private final IoServiceListenerSupport listeners;
136
137
138
139
140
141 protected final Object disposalLock = new Object();
142
143 private volatile boolean disposing;
144
145 private volatile boolean disposed;
146
147 private IoFuture disposalFuture;
148
149
150
151
152 private IoServiceStatistics stats = new IoServiceStatistics(this);
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
168 if (sessionConfig == null) {
169 throw new NullPointerException("sessionConfig");
170 }
171
172 if (getTransportMetadata() == null) {
173 throw new NullPointerException("TransportMetadata");
174 }
175
176 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
177 sessionConfig.getClass())) {
178 throw new IllegalArgumentException("sessionConfig type: "
179 + sessionConfig.getClass() + " (expected: "
180 + getTransportMetadata().getSessionConfigType() + ")");
181 }
182
183
184
185 listeners = new IoServiceListenerSupport(this);
186 listeners.add(serviceActivationListener);
187
188
189 this.sessionConfig = sessionConfig;
190
191
192
193 ExceptionMonitor.getInstance();
194
195 if (executor == null) {
196 this.executor = Executors.newCachedThreadPool();
197 createdExecutor = true;
198 } else {
199 this.executor = executor;
200 createdExecutor = false;
201 }
202
203 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
204 }
205
206
207
208
209 public final IoFilterChainBuilder getFilterChainBuilder() {
210 return filterChainBuilder;
211 }
212
213
214
215
216 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
217 if (builder == null) {
218 builder = new DefaultIoFilterChainBuilder();
219 }
220 filterChainBuilder = builder;
221 }
222
223
224
225
226 public final DefaultIoFilterChainBuilder getFilterChain() {
227 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
228 return (DefaultIoFilterChainBuilder) filterChainBuilder;
229 }
230
231
232 throw new IllegalStateException(
233 "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
234 }
235
236
237
238
239 public final void addListener(IoServiceListener listener) {
240 listeners.add(listener);
241 }
242
243
244
245
246 public final void removeListener(IoServiceListener listener) {
247 listeners.remove(listener);
248 }
249
250
251
252
253 public final boolean isActive() {
254 return listeners.isActive();
255 }
256
257
258
259
260 public final boolean isDisposing() {
261 return disposing;
262 }
263
264
265
266
267 public final boolean isDisposed() {
268 return disposed;
269 }
270
271
272
273
274 public final void dispose() {
275 if (disposed) {
276 return;
277 }
278
279 IoFuture disposalFuture;
280 synchronized (disposalLock) {
281 disposalFuture = this.disposalFuture;
282 if (!disposing) {
283 disposing = true;
284 try {
285 this.disposalFuture = disposalFuture = dispose0();
286 } catch (Exception e) {
287 ExceptionMonitor.getInstance().exceptionCaught(e);
288 } finally {
289 if (disposalFuture == null) {
290 disposed = true;
291 }
292 }
293 }
294 }
295
296 if (disposalFuture != null) {
297 disposalFuture.awaitUninterruptibly();
298 }
299
300 if (createdExecutor) {
301 ExecutorService e = (ExecutorService) executor;
302 e.shutdown();
303 while (!e.isTerminated()) {
304 try {
305 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
306 } catch (InterruptedException e1) {
307
308 }
309 }
310 }
311
312 disposed = true;
313 }
314
315
316
317
318
319 protected abstract IoFuture dispose0() throws Exception;
320
321
322
323
324 public final Map<Long, IoSession> getManagedSessions() {
325 return listeners.getManagedSessions();
326 }
327
328
329
330
331 public final int getManagedSessionCount() {
332 return listeners.getManagedSessionCount();
333 }
334
335
336
337
338 public final IoHandler getHandler() {
339 return handler;
340 }
341
342
343
344
345 public final void setHandler(IoHandler handler) {
346 if (handler == null) {
347 throw new NullPointerException("handler cannot be null");
348 }
349
350 if (isActive()) {
351 throw new IllegalStateException(
352 "handler cannot be set while the service is active.");
353 }
354
355 this.handler = handler;
356 }
357
358
359
360
361 public IoSessionConfig getSessionConfig() {
362 return sessionConfig;
363 }
364
365
366
367
368 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
369 return sessionDataStructureFactory;
370 }
371
372
373
374
375 public final void setSessionDataStructureFactory(
376 IoSessionDataStructureFactory sessionDataStructureFactory) {
377 if (sessionDataStructureFactory == null) {
378 throw new NullPointerException("sessionDataStructureFactory");
379 }
380
381 if (isActive()) {
382 throw new IllegalStateException(
383 "sessionDataStructureFactory cannot be set while the service is active.");
384 }
385
386 this.sessionDataStructureFactory = sessionDataStructureFactory;
387 }
388
389
390
391
392 public IoServiceStatistics getStatistics() {
393 return stats;
394 }
395
396
397
398
399 public final long getActivationTime() {
400 return listeners.getActivationTime();
401 }
402
403
404
405
406 public final Set<WriteFuture> broadcast(Object message) {
407
408
409
410 final List<WriteFuture> futures = IoUtil.broadcast(message,
411 getManagedSessions().values());
412 return new AbstractSet<WriteFuture>() {
413 @Override
414 public Iterator<WriteFuture> iterator() {
415 return futures.iterator();
416 }
417
418 @Override
419 public int size() {
420 return futures.size();
421 }
422 };
423 }
424
425 public final IoServiceListenerSupport getListeners() {
426 return listeners;
427 }
428
429
430 protected final void executeWorker(Runnable worker) {
431 executeWorker(worker, null);
432 }
433
434 protected final void executeWorker(Runnable worker, String suffix) {
435 String actualThreadName = threadName;
436 if (suffix != null) {
437 actualThreadName = actualThreadName + '-' + suffix;
438 }
439 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
440 }
441
442
443 @SuppressWarnings("unchecked")
444 protected final void initSession(IoSession session,
445 IoFuture future, IoSessionInitializer sessionInitializer) {
446
447 if (stats.getLastReadTime() == 0) {
448 stats.setLastReadTime(getActivationTime());
449 }
450
451 if (stats.getLastWriteTime() == 0) {
452 stats.setLastWriteTime(getActivationTime());
453 }
454
455
456
457
458
459 try {
460 ((AbstractIoSession) session).setAttributeMap(session.getService()
461 .getSessionDataStructureFactory().getAttributeMap(session));
462 } catch (IoSessionInitializationException e) {
463 throw e;
464 } catch (Exception e) {
465 throw new IoSessionInitializationException(
466 "Failed to initialize an attributeMap.", e);
467 }
468
469 try {
470 ((AbstractIoSession) session).setWriteRequestQueue(session
471 .getService().getSessionDataStructureFactory()
472 .getWriteRequestQueue(session));
473 } catch (IoSessionInitializationException e) {
474 throw e;
475 } catch (Exception e) {
476 throw new IoSessionInitializationException(
477 "Failed to initialize a writeRequestQueue.", e);
478 }
479
480 if ((future != null) && (future instanceof ConnectFuture)) {
481
482 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE,
483 future);
484 }
485
486 if (sessionInitializer != null) {
487 sessionInitializer.initializeSession(session, future);
488 }
489
490 finishSessionInitialization0(session, future);
491 }
492
493
494
495
496
497
498
499 protected void finishSessionInitialization0(IoSession session,
500 IoFuture future) {
501
502 }
503
504 protected static class ServiceOperationFuture extends DefaultIoFuture {
505 public ServiceOperationFuture() {
506 super(null);
507 }
508
509 public final boolean isDone() {
510 return getValue() == Boolean.TRUE;
511 }
512
513 public final void setDone() {
514 setValue(Boolean.TRUE);
515 }
516
517 public final Exception getException() {
518 if (getValue() instanceof Exception) {
519 return (Exception) getValue();
520 }
521
522 return null;
523 }
524
525 public final void setException(Exception exception) {
526 if (exception == null) {
527 throw new NullPointerException("exception");
528 }
529 setValue(exception);
530 }
531 }
532
533
534
535
536 public int getScheduledWriteBytes() {
537 return stats.getScheduledWriteBytes();
538 }
539
540
541
542
543 public int getScheduledWriteMessages() {
544 return stats.getScheduledWriteMessages();
545 }
546
547 }