1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.util.Iterator;
23 import java.util.Set;
24
25 import org.apache.mina.core.future.IoFuture;
26 import org.apache.mina.core.future.IoFutureListener;
27 import org.apache.mina.core.service.AbstractIoService;
28 import org.apache.mina.core.service.IoService;
29 import org.apache.mina.core.write.WriteRequest;
30 import org.apache.mina.core.write.WriteTimeoutException;
31 import org.apache.mina.util.ConcurrentHashSet;
32
33
34
35
36
37
38
39 public class IdleStatusChecker {
40 private final Set<AbstractIoSession> sessions =
41 new ConcurrentHashSet<AbstractIoSession>();
42 private final Set<AbstractIoService> services =
43 new ConcurrentHashSet<AbstractIoService>();
44
45 private final NotifyingTask notifyingTask = new NotifyingTaskImpl();
46 private final IoFutureListener<IoFuture> sessionCloseListener =
47 new SessionCloseListener();
48
49 public IdleStatusChecker() {}
50
51 public void addSession(AbstractIoSession session) {
52 sessions.add(session);
53 session.getCloseFuture().addListener(sessionCloseListener);
54 }
55
56 public void addService(AbstractIoService service) {
57 services.add(service);
58 }
59
60 public void removeSession(AbstractIoSession session) {
61 sessions.remove(session);
62 }
63
64 public void removeService(AbstractIoService service) {
65 services.remove(service);
66 }
67
68 public NotifyingTask getNotifyingTask() {
69 return notifyingTask;
70 }
71
72 public interface NotifyingTask extends Runnable {
73
74
75
76
77
78 void cancel();
79 }
80
81 private class NotifyingTaskImpl implements NotifyingTask {
82 private volatile boolean cancelled;
83 private volatile Thread thread;
84
85 public void run() {
86 thread = Thread.currentThread();
87 try {
88 while (!cancelled) {
89
90 long currentTime = System.currentTimeMillis();
91 notifyServices(currentTime);
92 notifySessions(currentTime);
93
94 try {
95 Thread.sleep(1000);
96 } catch (InterruptedException e) {
97
98 }
99 }
100 } finally {
101 thread = null;
102 }
103 }
104
105 public void cancel() {
106 cancelled = true;
107 Thread thread = this.thread;
108 if (thread != null) {
109 thread.interrupt();
110 }
111 }
112
113 private void notifyServices(long currentTime) {
114 Iterator<AbstractIoService> it = services.iterator();
115 while (it.hasNext()) {
116 AbstractIoService service = it.next();
117 if (service.isActive()) {
118 notifyIdleness(service, currentTime, false);
119 }
120 }
121 }
122
123 private void notifySessions(long currentTime) {
124 Iterator<AbstractIoSession> it = sessions.iterator();
125 while (it.hasNext()) {
126 AbstractIoSession session = it.next();
127 if (session.isConnected()) {
128 notifyIdleSession(session, currentTime);
129 }
130 }
131 }
132 }
133
134 private class SessionCloseListener implements IoFutureListener<IoFuture> {
135 public void operationComplete(IoFuture future) {
136 removeSession((AbstractIoSession) future.getSession());
137 }
138 }
139
140
141
142
143
144
145
146 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
147 IoSession s = null;
148 while (sessions.hasNext()) {
149 s = sessions.next();
150 notifyIdleSession(s, currentTime);
151 }
152 }
153
154 public static void notifyIdleness(IoService service, long currentTime) {
155 notifyIdleness(service, currentTime, true);
156 }
157
158 private static void notifyIdleness(IoService service, long currentTime, boolean includeSessions) {
159 if (!(service instanceof AbstractIoService)) {
160 return;
161 }
162
163 ((AbstractIoService) service).notifyIdleness(currentTime);
164
165 if (includeSessions) {
166 notifyIdleness(service.getManagedSessions().values().iterator(), currentTime);
167 }
168 }
169
170
171
172
173
174
175
176 public static void notifyIdleSession(IoSession session, long currentTime) {
177 if (session instanceof AbstractIoSession) {
178 AbstractIoSession s = (AbstractIoSession) session;
179 notifyIdleSession1(
180 s, currentTime,
181 s.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
182 IdleStatus.BOTH_IDLE, Math.max(
183 s.getLastIoTime(),
184 s.getLastIdleTime(IdleStatus.BOTH_IDLE)));
185
186 notifyIdleSession1(
187 s, currentTime,
188 s.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
189 IdleStatus.READER_IDLE, Math.max(
190 s.getLastReadTime(),
191 s.getLastIdleTime(IdleStatus.READER_IDLE)));
192
193 notifyIdleSession1(
194 s, currentTime,
195 s.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
196 IdleStatus.WRITER_IDLE, Math.max(
197 s.getLastWriteTime(),
198 s.getLastIdleTime(IdleStatus.WRITER_IDLE)));
199
200 notifyWriteTimeout(s, currentTime);
201 updateThroughput(s, currentTime);
202 } else {
203 notifyIdleSession0(
204 session, currentTime,
205 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
206 IdleStatus.BOTH_IDLE, Math.max(
207 session.getLastIoTime(),
208 session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
209
210 notifyIdleSession0(
211 session, currentTime,
212 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
213 IdleStatus.READER_IDLE, Math.max(
214 session.getLastReadTime(),
215 session.getLastIdleTime(IdleStatus.READER_IDLE)));
216
217 notifyIdleSession0(
218 session, currentTime,
219 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
220 IdleStatus.WRITER_IDLE, Math.max(
221 session.getLastWriteTime(),
222 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
223 }
224 }
225
226 private static void notifyIdleSession0(
227 IoSession session, long currentTime,
228 long idleTime, IdleStatus status, long lastIoTime) {
229 if (idleTime > 0 && lastIoTime != 0
230 && currentTime - lastIoTime >= idleTime) {
231 session.getFilterChain().fireSessionIdle(status);
232 }
233 }
234
235 private static void notifyIdleSession1(
236 AbstractIoSession session, long currentTime,
237 long idleTime, IdleStatus status, long lastIoTime) {
238 if (idleTime > 0 && lastIoTime != 0
239 && currentTime - lastIoTime >= idleTime) {
240 session.getFilterChain().fireSessionIdle(status);
241 }
242 }
243
244 private static void notifyWriteTimeout(
245 AbstractIoSession session, long currentTime) {
246
247 long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
248 if (writeTimeout > 0 &&
249 currentTime - session.getLastWriteTime() >= writeTimeout &&
250 !session.getWriteRequestQueue().isEmpty(session)) {
251 WriteRequest request = session.getCurrentWriteRequest();
252 if (request != null) {
253 session.setCurrentWriteRequest(null);
254 WriteTimeoutException cause = new WriteTimeoutException(request);
255 request.getFuture().setException(cause);
256 session.getFilterChain().fireExceptionCaught(cause);
257
258 session.close();
259 }
260 }
261 }
262
263 private static void updateThroughput(
264 AbstractIoSession session, long currentTime) {
265 session.updateThroughput(currentTime, false);
266 }
267 }