1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.IoUtil;
34 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37 import org.apache.mina.core.future.ConnectFuture;
38 import org.apache.mina.core.future.DefaultIoFuture;
39 import org.apache.mina.core.future.IoFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43 import org.apache.mina.core.session.IdleStatus;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.IoSessionDataStructureFactory;
47 import org.apache.mina.core.session.IoSessionInitializationException;
48 import org.apache.mina.core.session.IoSessionInitializer;
49 import org.apache.mina.util.ExceptionMonitor;
50 import org.apache.mina.util.NamePreservingRunnable;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62 public abstract class AbstractIoService implements IoService {
63
64 protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65
66
67
68
69
70 private static final AtomicInteger id = new AtomicInteger();
71
72
73
74
75
76 private final String threadName;
77
78
79
80
81 private final Executor executor;
82
83
84
85
86
87
88
89
90 private final boolean createdExecutor;
91
92
93
94
95 private IoHandler handler;
96
97
98
99
100 protected final IoSessionConfig sessionConfig;
101
102 private final IoServiceListenererviceListener">IoServiceListener serviceActivationListener = new IoServiceListener() {
103 IoServiceStatistics serviceStats;
104
105
106
107
108 @Override
109 public void serviceActivated(IoService service) {
110
111 serviceStats = service.getStatistics();
112 serviceStats.setLastReadTime(service.getActivationTime());
113 serviceStats.setLastWriteTime(service.getActivationTime());
114 serviceStats.setLastThroughputCalculationTime(service.getActivationTime());
115 }
116
117
118
119
120 @Override
121 public void serviceDeactivated(IoService service) throws Exception {
122
123 }
124
125
126
127
128 @Override
129 public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
130
131 }
132
133
134
135
136 @Override
137 public void sessionCreated(IoSession session) throws Exception {
138
139 }
140
141
142
143
144 @Override
145 public void sessionClosed(IoSession session) throws Exception {
146
147 }
148
149
150
151
152 @Override
153 public void sessionDestroyed(IoSession session) throws Exception {
154
155 }
156 };
157
158
159
160
161 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
162
163 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
164
165
166
167
168 private final IoServiceListenerSupport listeners;
169
170
171
172
173
174 protected final Object disposalLock = new Object();
175
176 private volatile boolean disposing;
177
178 private volatile boolean disposed;
179
180 private IoServiceStatisticseStatistics.html#IoServiceStatistics">IoServiceStatistics stats = new IoServiceStatistics(this);
181
182
183
184
185
186
187
188
189
190
191
192
193
194 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
195 if (sessionConfig == null) {
196 throw new IllegalArgumentException("sessionConfig");
197 }
198
199 if (getTransportMetadata() == null) {
200 throw new IllegalArgumentException("TransportMetadata");
201 }
202
203 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
204 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
205 + getTransportMetadata().getSessionConfigType() + ")");
206 }
207
208
209
210 listeners = new IoServiceListenerSupport(this);
211 listeners.add(serviceActivationListener);
212
213
214 this.sessionConfig = sessionConfig;
215
216
217
218 ExceptionMonitor.getInstance();
219
220 if (executor == null) {
221 this.executor = Executors.newCachedThreadPool();
222 createdExecutor = true;
223 } else {
224 this.executor = executor;
225 createdExecutor = false;
226 }
227
228 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
229 }
230
231
232
233
234 @Override
235 public final IoFilterChainBuilder getFilterChainBuilder() {
236 return filterChainBuilder;
237 }
238
239
240
241
242 @Override
243 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
244 if (builder == null) {
245 filterChainBuilder = new DefaultIoFilterChainBuilder();
246 } else {
247 filterChainBuilder = builder;
248 }
249 }
250
251
252
253
254 @Override
255 public final DefaultIoFilterChainBuilder getFilterChain() {
256 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
257 return (DefaultIoFilterChainBuilder) filterChainBuilder;
258 }
259
260 throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
261 }
262
263
264
265
266 @Override
267 public final void addListener(IoServiceListener listener) {
268 listeners.add(listener);
269 }
270
271
272
273
274 @Override
275 public final void removeListener(IoServiceListener listener) {
276 listeners.remove(listener);
277 }
278
279
280
281
282 @Override
283 public final boolean isActive() {
284 return listeners.isActive();
285 }
286
287
288
289
290 @Override
291 public final boolean isDisposing() {
292 return disposing;
293 }
294
295
296
297
298 @Override
299 public final boolean isDisposed() {
300 return disposed;
301 }
302
303
304
305
306 @Override
307 public final void dispose() {
308 dispose(false);
309 }
310
311
312
313
314 @Override
315 public final void dispose(boolean awaitTermination) {
316 if (disposed) {
317 return;
318 }
319
320 synchronized (disposalLock) {
321 if (!disposing) {
322 disposing = true;
323
324 try {
325 dispose0();
326 } catch (Exception e) {
327 ExceptionMonitor.getInstance().exceptionCaught(e);
328 }
329 }
330 }
331
332 if (createdExecutor) {
333 ExecutorService e = (ExecutorService) executor;
334 e.shutdownNow();
335 if (awaitTermination) {
336
337 try {
338 LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
339 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
340 LOGGER.debug("awaitTermination on {} finished", this);
341 } catch (InterruptedException e1) {
342 LOGGER.warn("awaitTermination on [{}] was interrupted", this);
343
344 Thread.currentThread().interrupt();
345 }
346 }
347 }
348 disposed = true;
349 }
350
351
352
353
354
355
356
357 protected abstract void dispose0() throws Exception;
358
359
360
361
362 @Override
363 public final Map<Long, IoSession> getManagedSessions() {
364 return listeners.getManagedSessions();
365 }
366
367
368
369
370 @Override
371 public final int getManagedSessionCount() {
372 return listeners.getManagedSessionCount();
373 }
374
375
376
377
378 @Override
379 public final IoHandler getHandler() {
380 return handler;
381 }
382
383
384
385
386 @Override
387 public final void setHandler(IoHandler handler) {
388 if (handler == null) {
389 throw new IllegalArgumentException("handler cannot be null");
390 }
391
392 if (isActive()) {
393 throw new IllegalStateException("handler cannot be set while the service is active.");
394 }
395
396 this.handler = handler;
397 }
398
399
400
401
402 @Override
403 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
404 return sessionDataStructureFactory;
405 }
406
407
408
409
410 @Override
411 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
412 if (sessionDataStructureFactory == null) {
413 throw new IllegalArgumentException("sessionDataStructureFactory");
414 }
415
416 if (isActive()) {
417 throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
418 }
419
420 this.sessionDataStructureFactory = sessionDataStructureFactory;
421 }
422
423
424
425
426 @Override
427 public IoServiceStatistics getStatistics() {
428 return stats;
429 }
430
431
432
433
434 @Override
435 public final long getActivationTime() {
436 return listeners.getActivationTime();
437 }
438
439
440
441
442 @Override
443 public final Set<WriteFuture> broadcast(Object message) {
444
445
446
447 final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
448 return new AbstractSet<WriteFuture>() {
449 @Override
450 public Iterator<WriteFuture> iterator() {
451 return futures.iterator();
452 }
453
454 @Override
455 public int size() {
456 return futures.size();
457 }
458 };
459 }
460
461
462
463
464 public final IoServiceListenerSupport getListeners() {
465 return listeners;
466 }
467
468 protected final void executeWorker(Runnable worker) {
469 executeWorker(worker, null);
470 }
471
472 protected final void executeWorker(Runnable worker, String suffix) {
473 String actualThreadName = threadName;
474 if (suffix != null) {
475 actualThreadName = actualThreadName + '-' + suffix;
476 }
477 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
478 }
479
480 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
481
482 if (stats.getLastReadTime() == 0) {
483 stats.setLastReadTime(getActivationTime());
484 }
485
486 if (stats.getLastWriteTime() == 0) {
487 stats.setLastWriteTime(getActivationTime());
488 }
489
490
491
492
493
494 try {
495 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
496 .getAttributeMap(session));
497 } catch (IoSessionInitializationException e) {
498 throw e;
499 } catch (Exception e) {
500 throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
501 }
502
503 try {
504 ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
505 .getWriteRequestQueue(session));
506 } catch (IoSessionInitializationException e) {
507 throw e;
508 } catch (Exception e) {
509 throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
510 }
511
512 if ((future != null) && (future instanceof ConnectFuture)) {
513
514 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
515 }
516
517 if (sessionInitializer != null) {
518 sessionInitializer.initializeSession(session, future);
519 }
520
521 finishSessionInitialization0(session, future);
522 }
523
524
525
526
527
528
529
530
531
532
533
534 protected void finishSessionInitialization0(IoSession session, IoFuture future) {
535
536 }
537
538
539
540
541
542 protected static class ServiceOperationFuture extends DefaultIoFuture {
543 public ServiceOperationFuture() {
544 super(null);
545 }
546
547
548
549
550 @Override
551 public final boolean isDone() {
552 return getValue() == Boolean.TRUE;
553 }
554
555 public final void setDone() {
556 setValue(Boolean.TRUE);
557 }
558
559 public final Exception getException() {
560 if (getValue() instanceof Exception) {
561 return (Exception) getValue();
562 }
563
564 return null;
565 }
566
567 public final void setException(Exception exception) {
568 if (exception == null) {
569 throw new IllegalArgumentException("exception");
570 }
571
572 setValue(exception);
573 }
574 }
575
576
577
578
579 @Override
580 public int getScheduledWriteBytes() {
581 return stats.getScheduledWriteBytes();
582 }
583
584
585
586
587 @Override
588 public int getScheduledWriteMessages() {
589 return stats.getScheduledWriteMessages();
590 }
591 }