1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.IoUtil;
34 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37 import org.apache.mina.core.future.ConnectFuture;
38 import org.apache.mina.core.future.DefaultIoFuture;
39 import org.apache.mina.core.future.IoFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43 import org.apache.mina.core.session.IdleStatus;
44 import org.apache.mina.core.session.IdleStatusChecker;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.IoSessionDataStructureFactory;
48 import org.apache.mina.core.session.IoSessionInitializationException;
49 import org.apache.mina.core.session.IoSessionInitializer;
50 import org.apache.mina.util.ExceptionMonitor;
51 import org.apache.mina.util.NamePreservingRunnable;
52
53
54
55
56
57
58
59
60
61
62 public abstract class AbstractIoService implements IoService {
63
64
65
66
67 private static final AtomicInteger id = new AtomicInteger();
68
69
70
71
72
73 private final String threadName;
74
75
76
77
78 private final Executor executor;
79
80
81
82
83
84
85
86
87 private final boolean createdExecutor;
88
89
90
91
92 private IoHandler handler;
93
94
95
96
97 private final IoSessionConfig sessionConfig;
98
99 private final IoServiceListener serviceActivationListener = new IoServiceListener() {
100 public void serviceActivated(IoService service) {
101
102 AbstractIoService s = (AbstractIoService) service;
103 IoServiceStatistics _stats = (IoServiceStatistics) s.getStatistics();
104 _stats.setLastReadTime(s.getActivationTime());
105 _stats.setLastWriteTime(s.getActivationTime());
106 _stats.setLastThroughputCalculationTime(s.getActivationTime());
107
108
109 idleStatusChecker.addService(s);
110 }
111
112 public void serviceDeactivated(IoService service) {
113 idleStatusChecker.removeService((AbstractIoService) service);
114 }
115
116 public void serviceIdle(IoService service, IdleStatus idleStatus) {
117 }
118
119 public void sessionCreated(IoSession session) {
120 }
121
122 public void sessionDestroyed(IoSession session) {
123 }
124 };
125
126
127
128
129 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
130
131 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
132
133
134
135
136 private final IoServiceListenerSupport listeners;
137
138
139
140
141
142 protected final Object disposalLock = new Object();
143
144 private volatile boolean disposing;
145
146 private volatile boolean disposed;
147
148 private IoFuture disposalFuture;
149
150 private final IdleStatusChecker idleStatusChecker = new IdleStatusChecker();
151
152
153
154
155 private IoServiceStatistics stats = new IoServiceStatistics(this);
156
157
158
159
160 private IoServiceIdleState idleState = new IoServiceIdleState(this);
161
162
163
164
165
166
167
168
169
170
171
172
173
174 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
175 if (sessionConfig == null) {
176 throw new NullPointerException("sessionConfig");
177 }
178
179 if (getTransportMetadata() == null) {
180 throw new NullPointerException("TransportMetadata");
181 }
182
183 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
184 sessionConfig.getClass())) {
185 throw new IllegalArgumentException("sessionConfig type: "
186 + sessionConfig.getClass() + " (expected: "
187 + getTransportMetadata().getSessionConfigType() + ")");
188 }
189
190
191
192 listeners = new IoServiceListenerSupport(this);
193 listeners.add(serviceActivationListener);
194
195
196 this.sessionConfig = sessionConfig;
197
198
199
200 ExceptionMonitor.getInstance();
201
202 if (executor == null) {
203 this.executor = Executors.newCachedThreadPool();
204 createdExecutor = true;
205 } else {
206 this.executor = executor;
207 createdExecutor = false;
208 }
209
210 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
211
212 executeWorker(idleStatusChecker.getNotifyingTask(), "idleStatusChecker");
213 }
214
215
216
217
218 public final IoFilterChainBuilder getFilterChainBuilder() {
219 return filterChainBuilder;
220 }
221
222
223
224
225 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
226 if (builder == null) {
227 builder = new DefaultIoFilterChainBuilder();
228 }
229 filterChainBuilder = builder;
230 }
231
232
233
234
235 public final DefaultIoFilterChainBuilder getFilterChain() {
236 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
237 return (DefaultIoFilterChainBuilder) filterChainBuilder;
238 } else {
239 throw new IllegalStateException(
240 "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
241 }
242 }
243
244
245
246
247 public final void addListener(IoServiceListener listener) {
248 listeners.add(listener);
249 }
250
251
252
253
254 public final void removeListener(IoServiceListener listener) {
255 listeners.remove(listener);
256 }
257
258
259
260
261 public final boolean isActive() {
262 return listeners.isActive();
263 }
264
265
266
267
268 public final boolean isDisposing() {
269 return disposing;
270 }
271
272
273
274
275 public final boolean isDisposed() {
276 return disposed;
277 }
278
279
280
281
282 public final void dispose() {
283 if (disposed) {
284 return;
285 }
286
287 IoFuture disposalFuture;
288 synchronized (disposalLock) {
289 disposalFuture = this.disposalFuture;
290 if (!disposing) {
291 disposing = true;
292 try {
293 this.disposalFuture = disposalFuture = dispose0();
294 } catch (Exception e) {
295 ExceptionMonitor.getInstance().exceptionCaught(e);
296 } finally {
297 if (disposalFuture == null) {
298 disposed = true;
299 }
300 }
301 }
302 }
303
304 idleStatusChecker.getNotifyingTask().cancel();
305 if (disposalFuture != null) {
306 disposalFuture.awaitUninterruptibly();
307 }
308
309 if (createdExecutor) {
310 ExecutorService e = (ExecutorService) executor;
311 e.shutdown();
312 while (!e.isTerminated()) {
313 try {
314 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
315 } catch (InterruptedException e1) {
316
317 }
318 }
319 }
320
321 disposed = true;
322 }
323
324
325
326
327
328 protected abstract IoFuture dispose0() throws Exception;
329
330
331
332
333 public final Map<Long, IoSession> getManagedSessions() {
334 return listeners.getManagedSessions();
335 }
336
337
338
339
340 public final int getManagedSessionCount() {
341 return listeners.getManagedSessionCount();
342 }
343
344
345
346
347 public final IoHandler getHandler() {
348 return handler;
349 }
350
351
352
353
354 public final void setHandler(IoHandler handler) {
355 if (handler == null) {
356 throw new NullPointerException("handler cannot be null");
357 }
358
359 if (isActive()) {
360 throw new IllegalStateException(
361 "handler cannot be set while the service is active.");
362 }
363
364 this.handler = handler;
365 }
366
367
368
369
370 public IoSessionConfig getSessionConfig() {
371 return sessionConfig;
372 }
373
374
375
376
377 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
378 return sessionDataStructureFactory;
379 }
380
381
382
383
384 public final void setSessionDataStructureFactory(
385 IoSessionDataStructureFactory sessionDataStructureFactory) {
386 if (sessionDataStructureFactory == null) {
387 throw new NullPointerException("sessionDataStructureFactory");
388 }
389
390 if (isActive()) {
391 throw new IllegalStateException(
392 "sessionDataStructureFactory cannot be set while the service is active.");
393 }
394
395 this.sessionDataStructureFactory = sessionDataStructureFactory;
396 }
397
398
399
400
401 public IoServiceIdleState getIdleState() {
402 return idleState;
403 }
404
405
406
407
408 public IoServiceStatistics getStatistics() {
409 return stats;
410 }
411
412
413
414
415 public final long getActivationTime() {
416 return listeners.getActivationTime();
417 }
418
419
420
421
422 public final Set<WriteFuture> broadcast(Object message) {
423
424
425
426 final List<WriteFuture> futures = IoUtil.broadcast(message,
427 getManagedSessions().values());
428 return new AbstractSet<WriteFuture>() {
429 @Override
430 public Iterator<WriteFuture> iterator() {
431 return futures.iterator();
432 }
433
434 @Override
435 public int size() {
436 return futures.size();
437 }
438 };
439 }
440
441 public final IoServiceListenerSupport getListeners() {
442 return listeners;
443 }
444
445 protected final IdleStatusChecker getIdleStatusChecker() {
446 return idleStatusChecker;
447 }
448
449 protected final void executeWorker(Runnable worker) {
450 executeWorker(worker, null);
451 }
452
453 protected final void executeWorker(Runnable worker, String suffix) {
454 String actualThreadName = threadName;
455 if (suffix != null) {
456 actualThreadName = actualThreadName + '-' + suffix;
457 }
458 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
459 }
460
461
462 @SuppressWarnings("unchecked")
463 protected final void finishSessionInitialization(IoSession session,
464 IoFuture future, IoSessionInitializer sessionInitializer) {
465
466 if (stats.getLastReadTime() == 0) {
467 ((IoServiceStatistics)stats).setLastReadTime(getActivationTime());
468 }
469 if (stats.getLastWriteTime() == 0) {
470 ((IoServiceStatistics)stats).setLastWriteTime(getActivationTime());
471 }
472
473
474
475
476
477 try {
478 ((AbstractIoSession) session).setAttributeMap(session.getService()
479 .getSessionDataStructureFactory().getAttributeMap(session));
480 } catch (IoSessionInitializationException e) {
481 throw e;
482 } catch (Exception e) {
483 throw new IoSessionInitializationException(
484 "Failed to initialize an attributeMap.", e);
485 }
486
487 try {
488 ((AbstractIoSession) session).setWriteRequestQueue(session
489 .getService().getSessionDataStructureFactory()
490 .getWriteRequestQueue(session));
491 } catch (IoSessionInitializationException e) {
492 throw e;
493 } catch (Exception e) {
494 throw new IoSessionInitializationException(
495 "Failed to initialize a writeRequestQueue.", e);
496 }
497
498 if (future != null && future instanceof ConnectFuture) {
499
500 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE,
501 future);
502 }
503
504 if (sessionInitializer != null) {
505 sessionInitializer.initializeSession(session, future);
506 }
507
508 finishSessionInitialization0(session, future);
509 }
510
511
512
513
514
515
516
517 protected void finishSessionInitialization0(IoSession session,
518 IoFuture future) {
519 }
520
521 protected static class ServiceOperationFuture extends DefaultIoFuture {
522 public ServiceOperationFuture() {
523 super(null);
524 }
525
526 public final boolean isDone() {
527 return getValue() == Boolean.TRUE;
528 }
529
530 public final void setDone() {
531 setValue(Boolean.TRUE);
532 }
533
534 public final Exception getException() {
535 if (getValue() instanceof Exception) {
536 return (Exception) getValue();
537 } else {
538 return null;
539 }
540 }
541
542 public final void setException(Exception exception) {
543 if (exception == null) {
544 throw new NullPointerException("exception");
545 }
546 setValue(exception);
547 }
548 }
549
550
551
552
553 public int getScheduledWriteBytes() {
554 return stats.getScheduledWriteBytes();
555 }
556
557
558
559
560 public int getScheduledWriteMessages() {
561 return stats.getScheduledWriteMessages();
562 }
563
564
565
566
567 public void notifyIdleness(long currentTime) {
568 idleState.notifyIdleness(currentTime);
569 }
570 }