View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.util.Iterator;
23  import java.util.Set;
24  
25  import org.apache.mina.core.future.IoFuture;
26  import org.apache.mina.core.future.IoFutureListener;
27  import org.apache.mina.core.service.AbstractIoService;
28  import org.apache.mina.core.service.IoService;
29  import org.apache.mina.core.write.WriteRequest;
30  import org.apache.mina.core.write.WriteTimeoutException;
31  import org.apache.mina.util.ConcurrentHashSet;
32  
33  /**
34   * Detects idle sessions and fires <tt>sessionIdle</tt> events to them.
35   *
36   * @author The Apache MINA Project (dev@mina.apache.org)
37   * @version $Rev: 525369 $, $Date: 2007-04-04 05:05:11 +0200 (mer., 04 avr. 2007) $
38   */
39  public class IdleStatusChecker {
40      private final Set<AbstractIoSession> sessions =
41          new ConcurrentHashSet<AbstractIoSession>();
42      private final Set<AbstractIoService> services =
43          new ConcurrentHashSet<AbstractIoService>();
44  
45      private final NotifyingTask notifyingTask = new NotifyingTaskImpl();
46      private final IoFutureListener<IoFuture> sessionCloseListener =
47          new SessionCloseListener();
48  
49      public IdleStatusChecker() {}
50  
51      public void addSession(AbstractIoSession session) {
52          sessions.add(session);
53          session.getCloseFuture().addListener(sessionCloseListener);
54      }
55  
56      public void addService(AbstractIoService service) {
57          services.add(service);
58      }
59  
60      public void removeSession(AbstractIoSession session) {
61          sessions.remove(session);
62      }
63  
64      public void removeService(AbstractIoService service) {
65          services.remove(service);
66      }
67  
68      public NotifyingTask getNotifyingTask() {
69          return notifyingTask;
70      }
71  
72      public interface NotifyingTask extends Runnable {
73          /**
74           * Cancels this task.  Once canceled, {@link #run()} method will always return immediately.
75           * To start this task again after calling this method, you have to create a new instance of
76           * {@link IdleStatusChecker} again.
77           */
78          void cancel();
79      }
80  
81      private class NotifyingTaskImpl implements NotifyingTask {
82          private volatile boolean cancelled;
83          private volatile Thread thread;
84  
85          public void run() {
86              thread = Thread.currentThread();
87              try {
88                  while (!cancelled) {
89                      // Check idleness with fixed delay (1 second).
90                      long currentTime = System.currentTimeMillis();
91                      notifyServices(currentTime);
92                      notifySessions(currentTime);
93  
94                      try {
95                          Thread.sleep(1000);
96                      } catch (InterruptedException e) {
97                          // will exit the loop if interrupted from interrupt()
98                      }
99                  }
100             } finally {
101                 thread = null;
102             }
103         }
104 
105         public void cancel() {
106             cancelled = true;
107             Thread thread = this.thread;
108             if (thread != null) {
109                 thread.interrupt();
110             }
111         }
112 
113         private void notifyServices(long currentTime) {
114             Iterator<AbstractIoService> it = services.iterator();
115             while (it.hasNext()) {
116                 AbstractIoService service = it.next();
117                 if (service.isActive()) {
118                     notifyIdleness(service, currentTime, false);
119                 }
120             }
121         }
122 
123         private void notifySessions(long currentTime) {
124             Iterator<AbstractIoSession> it = sessions.iterator();
125             while (it.hasNext()) {
126                 AbstractIoSession session = it.next();
127                 if (session.isConnected()) {
128                     notifyIdleSession(session, currentTime);
129                 }
130             }
131         }
132     }
133 
134     private class SessionCloseListener implements IoFutureListener<IoFuture> {
135         public void operationComplete(IoFuture future) {
136             removeSession((AbstractIoSession) future.getSession());
137         }
138     }
139 
140     /**
141      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable
142      * sessions in the specified collection.
143      *
144      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
145      */
146     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
147         IoSession s = null;
148         while (sessions.hasNext()) {
149             s = sessions.next();
150             notifyIdleSession(s, currentTime);
151         }
152     }
153 
154     public static void notifyIdleness(IoService service, long currentTime) {
155         notifyIdleness(service, currentTime, true);
156     }
157 
158     private static void notifyIdleness(IoService service, long currentTime, boolean includeSessions) {
159         if (!(service instanceof AbstractIoService)) {
160             return;
161         }
162 
163         ((AbstractIoService) service).notifyIdleness(currentTime);
164 
165         if (includeSessions) {
166             notifyIdleness(service.getManagedSessions().values().iterator(), currentTime);
167         }
168     }
169 
170     /**
171      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
172      * specified {@code session}.
173      *
174      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
175      */
176     public static void notifyIdleSession(IoSession session, long currentTime) {
177         if (session instanceof AbstractIoSession) {
178             AbstractIoSession s = (AbstractIoSession) session;
179             notifyIdleSession1(
180                     s, currentTime,
181                     s.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
182                     IdleStatus.BOTH_IDLE, Math.max(
183                             s.getLastIoTime(),
184                             s.getLastIdleTime(IdleStatus.BOTH_IDLE)));
185 
186             notifyIdleSession1(
187                     s, currentTime,
188                     s.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
189                     IdleStatus.READER_IDLE, Math.max(
190                             s.getLastReadTime(),
191                             s.getLastIdleTime(IdleStatus.READER_IDLE)));
192 
193             notifyIdleSession1(
194                     s, currentTime,
195                     s.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
196                     IdleStatus.WRITER_IDLE, Math.max(
197                             s.getLastWriteTime(),
198                             s.getLastIdleTime(IdleStatus.WRITER_IDLE)));
199 
200             notifyWriteTimeout(s, currentTime);
201             updateThroughput(s, currentTime);
202         } else {
203             notifyIdleSession0(
204                     session, currentTime,
205                     session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
206                     IdleStatus.BOTH_IDLE, Math.max(
207                             session.getLastIoTime(),
208                             session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
209 
210             notifyIdleSession0(
211                     session, currentTime,
212                     session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
213                     IdleStatus.READER_IDLE, Math.max(
214                             session.getLastReadTime(),
215                             session.getLastIdleTime(IdleStatus.READER_IDLE)));
216 
217             notifyIdleSession0(
218                     session, currentTime,
219                     session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
220                     IdleStatus.WRITER_IDLE, Math.max(
221                             session.getLastWriteTime(),
222                             session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
223         }
224     }
225 
226     private static void notifyIdleSession0(
227             IoSession session, long currentTime,
228             long idleTime, IdleStatus status, long lastIoTime) {
229         if (idleTime > 0 && lastIoTime != 0
230                 && currentTime - lastIoTime >= idleTime) {
231             session.getFilterChain().fireSessionIdle(status);
232         }
233     }
234 
235     private static void notifyIdleSession1(
236             AbstractIoSession session, long currentTime,
237             long idleTime, IdleStatus status, long lastIoTime) {
238         if (idleTime > 0 && lastIoTime != 0
239                 && currentTime - lastIoTime >= idleTime) {
240             session.getFilterChain().fireSessionIdle(status);
241         }
242     }
243 
244     private static void notifyWriteTimeout(
245             AbstractIoSession session, long currentTime) {
246 
247         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
248         if (writeTimeout > 0 &&
249                 currentTime - session.getLastWriteTime() >= writeTimeout &&
250                 !session.getWriteRequestQueue().isEmpty(session)) {
251             WriteRequest request = session.getCurrentWriteRequest();
252             if (request != null) {
253                 session.setCurrentWriteRequest(null);
254                 WriteTimeoutException cause = new WriteTimeoutException(request);
255                 request.getFuture().setException(cause);
256                 session.getFilterChain().fireExceptionCaught(cause);
257                 // WriteException is an IOException, so we close the session.
258                 session.close();
259             }
260         }
261     }
262 
263     private static void updateThroughput(
264             AbstractIoSession session, long currentTime) {
265         session.updateThroughput(currentTime, false);
266     }
267 }