View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.reqres;
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.LinkedHashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.ScheduledFuture;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.mina.core.filterchain.IoFilterChain;
35  import org.apache.mina.core.session.AttributeKey;
36  import org.apache.mina.core.session.IoSession;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.apache.mina.filter.util.WriteRequestFilter;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * TODO Add documentation
44   * 
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   * @org.apache.xbean.XBean
47   */
48  public class RequestResponseFilter extends WriteRequestFilter {
49  
50      private final AttributeKey RESPONSE_INSPECTOR = new AttributeKey(getClass(), "responseInspector");
51      private final AttributeKey REQUEST_STORE = new AttributeKey(getClass(), "requestStore");
52      private final AttributeKey UNRESPONDED_REQUEST_STORE = new AttributeKey(getClass(), "unrespondedRequestStore");
53  
54      private final ResponseInspectorFactory responseInspectorFactory;
55      private final ScheduledExecutorService timeoutScheduler;
56  
57      private final static Logger LOGGER = LoggerFactory.getLogger(RequestResponseFilter.class);
58  
59      public RequestResponseFilter(final ResponseInspector responseInspector,
60              ScheduledExecutorService timeoutScheduler) {
61          if (responseInspector == null) {
62              throw new IllegalArgumentException("responseInspector");
63          }
64          if (timeoutScheduler == null) {
65              throw new IllegalArgumentException("timeoutScheduler");
66          }
67          this.responseInspectorFactory = new ResponseInspectorFactory() {
68              public ResponseInspector getResponseInspector() {
69                  return responseInspector;
70              }
71          };
72          this.timeoutScheduler = timeoutScheduler;
73      }
74  
75      public RequestResponseFilter(
76              ResponseInspectorFactory responseInspectorFactory,
77              ScheduledExecutorService timeoutScheduler) {
78          if (responseInspectorFactory == null) {
79              throw new IllegalArgumentException("responseInspectorFactory");
80          }
81          if (timeoutScheduler == null) {
82              throw new IllegalArgumentException("timeoutScheduler");
83          }
84          this.responseInspectorFactory = responseInspectorFactory;
85          this.timeoutScheduler = timeoutScheduler;
86      }
87  
88      @Override
89      public void onPreAdd(IoFilterChain parent, String name,
90              NextFilter nextFilter) throws Exception {
91          if (parent.contains(this)) {
92              throw new IllegalArgumentException(
93                      "You can't add the same filter instance more than once.  Create another instance and add it.");
94          }
95  
96          IoSession session = parent.getSession();
97          session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory
98                  .getResponseInspector());
99          session.setAttribute(REQUEST_STORE, createRequestStore(session));
100         session.setAttribute(UNRESPONDED_REQUEST_STORE, createUnrespondedRequestStore(session));
101     }
102 
103     @Override
104     public void onPostRemove(IoFilterChain parent, String name,
105             NextFilter nextFilter) throws Exception {
106         IoSession session = parent.getSession();
107 
108         destroyUnrespondedRequestStore(getUnrespondedRequestStore(session));
109         destroyRequestStore(getRequestStore(session));
110 
111         session.removeAttribute(UNRESPONDED_REQUEST_STORE);
112         session.removeAttribute(REQUEST_STORE);
113         session.removeAttribute(RESPONSE_INSPECTOR);
114     }
115 
116     @Override
117     public void messageReceived(NextFilter nextFilter, IoSession session,
118             Object message) throws Exception {
119         ResponseInspector responseInspector = (ResponseInspector) session
120                 .getAttribute(RESPONSE_INSPECTOR);
121         Object requestId = responseInspector.getRequestId(message);
122         if (requestId == null) {
123             // Not a response message.  Ignore.
124             nextFilter.messageReceived(session, message);
125             return;
126         }
127 
128         // Retrieve (or remove) the corresponding request.
129         ResponseType type = responseInspector.getResponseType(message);
130         if (type == null) {
131             nextFilter.exceptionCaught(session, new IllegalStateException(
132                     responseInspector.getClass().getName()
133                             + "#getResponseType() may not return null."));
134         }
135 
136         Map<Object, Request> requestStore = getRequestStore(session);
137 
138         Request request;
139         switch (type) {
140         case WHOLE:
141         case PARTIAL_LAST:
142             synchronized (requestStore) {
143                 request = requestStore.remove(requestId);
144             }
145             break;
146         case PARTIAL:
147             synchronized (requestStore) {
148                 request = requestStore.get(requestId);
149             }
150             break;
151         default:
152             throw new InternalError();
153         }
154 
155         if (request == null) {
156             // A response message without request. Swallow the event because
157             // the response might have arrived too late.
158             if (LOGGER.isWarnEnabled()) {
159                 LOGGER.warn("Unknown request ID '" + requestId
160                         + "' for the response message. Timed out already?: "
161                         + message);
162             }
163         } else {
164             // Found a matching request.
165             // Cancel the timeout task if needed.
166             if (type != ResponseType.PARTIAL) {
167                 ScheduledFuture<?> scheduledFuture = request.getTimeoutFuture();
168                 if (scheduledFuture != null) {
169                     scheduledFuture.cancel(false);
170                     Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
171                     synchronized (unrespondedRequests) {
172                         unrespondedRequests.remove(request);
173                     }
174                 }
175             }
176 
177             // And forward the event.
178             Response response = new Response(request, message, type);
179             request.signal(response);
180             nextFilter.messageReceived(session, response);
181         }
182     }
183 
184     @Override
185     protected Object doFilterWrite(
186             final NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
187         Object message = writeRequest.getMessage();
188         if (!(message instanceof Request)) {
189             return null;
190         }
191 
192         final Request request = (Request) message;
193         if (request.getTimeoutFuture() != null) {
194             throw new IllegalArgumentException("Request can not be reused.");
195         }
196 
197         Map<Object, Request> requestStore = getRequestStore(session);
198         Object oldValue = null;
199         Object requestId = request.getId();
200         synchronized (requestStore) {
201             oldValue = requestStore.get(requestId);
202             if (oldValue == null) {
203                 requestStore.put(requestId, request);
204             }
205         }
206         if (oldValue != null) {
207             throw new IllegalStateException(
208                     "Duplicate request ID: " + request.getId());
209         }
210 
211         // Schedule a task to be executed on timeout.
212         TimeoutTask timeoutTask = new TimeoutTask(
213                 nextFilter, request, session);
214         ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
215                 timeoutTask, request.getTimeoutMillis(),
216                 TimeUnit.MILLISECONDS);
217         request.setTimeoutTask(timeoutTask);
218         request.setTimeoutFuture(timeoutFuture);
219 
220         // Add the timeout task to the unfinished task set.
221         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
222         synchronized (unrespondedRequests) {
223             unrespondedRequests.add(request);
224         }
225 
226         return request.getMessage();
227     }
228 
229     @Override
230     public void sessionClosed(NextFilter nextFilter, IoSession session)
231             throws Exception {
232         // Copy the unfinished task set to avoid unnecessary lock acquisition.
233         // Copying will be cheap because there won't be that many requests queued.
234         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
235         List<Request> unrespondedRequestsCopy;
236         synchronized (unrespondedRequests) {
237             unrespondedRequestsCopy = new ArrayList<Request>(
238                     unrespondedRequests);
239             unrespondedRequests.clear();
240         }
241 
242         // Generate timeout artificially.
243         for (Request r : unrespondedRequestsCopy) {
244             if (r.getTimeoutFuture().cancel(false)) {
245                 r.getTimeoutTask().run();
246             }
247         }
248 
249         // Clear the request store just in case we missed something, though it's unlikely.
250         Map<Object, Request> requestStore = getRequestStore(session);
251         synchronized (requestStore) {
252             requestStore.clear();
253         }
254 
255         // Now tell the main subject.
256         nextFilter.sessionClosed(session);
257     }
258 
259     @SuppressWarnings("unchecked")
260     private Map<Object, Request> getRequestStore(IoSession session) {
261         return (Map<Object, Request>) session.getAttribute(REQUEST_STORE);
262     }
263 
264     @SuppressWarnings("unchecked")
265     private Set<Request> getUnrespondedRequestStore(IoSession session) {
266         return (Set<Request>) session.getAttribute(UNRESPONDED_REQUEST_STORE);
267     }
268 
269     /**
270      * Returns a {@link Map} which stores {@code messageId}-{@link Request}
271      * pairs whose {@link Response}s are not received yet.  Please override
272      * this method if you need to use other {@link Map} implementation
273      * than the default one ({@link HashMap}).
274      */
275     protected Map<Object, Request> createRequestStore(
276             IoSession session) {
277         return new ConcurrentHashMap<Object, Request>();
278     }
279 
280     /**
281      * Returns a {@link Set} which stores {@link Request} whose
282      * {@link Response}s are not received yet. Please override
283      * this method if you need to use other {@link Set} implementation
284      * than the default one ({@link LinkedHashSet}).  Please note that
285      * the {@link Iterator} of the returned {@link Set} have to iterate
286      * its elements in the insertion order to ensure that
287      * {@link RequestTimeoutException}s are thrown in the order which
288      * {@link Request}s were written.  If you don't need to guarantee
289      * the order of thrown exceptions, any {@link Set} implementation
290      * can be used.
291      */
292     protected Set<Request> createUnrespondedRequestStore(
293             IoSession session) {
294         return new LinkedHashSet<Request>();
295     }
296 
297     /**
298      * Releases any resources related with the {@link Map} created by
299      * {@link #createRequestStore(IoSession)}.  This method is useful
300      * if you override {@link #createRequestStore(IoSession)}.
301      *
302      * @param requestStore what you returned in {@link #createRequestStore(IoSession)}
303      */
304     protected void destroyRequestStore(
305             Map<Object, Request> requestStore) {
306         // Do nothing
307     }
308 
309     /**
310      * Releases any resources related with the {@link Set} created by
311      * {@link #createUnrespondedRequestStore(IoSession)}.  This method is
312      * useful if you override {@link #createUnrespondedRequestStore(IoSession)}.
313      *
314      * @param unrespondedRequestStore what you returned in {@link #createUnrespondedRequestStore(IoSession)}
315      */
316     protected void destroyUnrespondedRequestStore(
317             Set<Request> unrespondedRequestStore) {
318         // Do nothing
319     }
320 
321     private class TimeoutTask implements Runnable {
322         private final NextFilter filter;
323 
324         private final Request request;
325 
326         private final IoSession session;
327 
328         private TimeoutTask(NextFilter filter, Request request,
329                 IoSession session) {
330             this.filter = filter;
331             this.request = request;
332             this.session = session;
333         }
334 
335         public void run() {
336             Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
337             if (unrespondedRequests != null) {
338                 synchronized (unrespondedRequests) {
339                     unrespondedRequests.remove(request);
340                 }
341             }
342 
343             Map<Object, Request> requestStore = getRequestStore(session);
344             Object requestId = request.getId();
345             boolean timedOut;
346             synchronized (requestStore) {
347                 if (requestStore.get(requestId) == request) {
348                     requestStore.remove(requestId);
349                     timedOut = true;
350                 } else {
351                     timedOut = false;
352                 }
353             }
354 
355             if (timedOut) {
356                 // Throw the exception only when it's really timed out.
357                 RequestTimeoutException e = new RequestTimeoutException(request);
358                 request.signal(e);
359                 filter.exceptionCaught(session, e);
360             }
361         }
362     }
363 }