View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.reqres;
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.LinkedHashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.ScheduledFuture;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.mina.core.filterchain.IoFilterChain;
34  import org.apache.mina.core.session.AttributeKey;
35  import org.apache.mina.core.session.IoSession;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.filter.util.WriteRequestFilter;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * TODO Add documentation
43   * 
44   * @author The Apache MINA Project (dev@mina.apache.org)
45   * @version $Rev: 672583 $, $Date: 2008-06-28 23:29:15 +0200 (sam, 28 jun 2008) $
46   */
47  public class RequestResponseFilter extends WriteRequestFilter {
48  
49      private final AttributeKey RESPONSE_INSPECTOR = new AttributeKey(getClass(), "responseInspector");
50      private final AttributeKey REQUEST_STORE = new AttributeKey(getClass(), "requestStore");
51      private final AttributeKey UNRESPONDED_REQUEST_STORE = new AttributeKey(getClass(), "unrespondedRequestStore");
52  
53      private final ResponseInspectorFactory responseInspectorFactory;
54      private final ScheduledExecutorService timeoutScheduler;
55  
56      private final Logger logger = LoggerFactory.getLogger(getClass());
57  
58      public RequestResponseFilter(final ResponseInspector responseInspector,
59              ScheduledExecutorService timeoutScheduler) {
60          if (responseInspector == null) {
61              throw new NullPointerException("responseInspector");
62          }
63          if (timeoutScheduler == null) {
64              throw new NullPointerException("timeoutScheduler");
65          }
66          this.responseInspectorFactory = new ResponseInspectorFactory() {
67              public ResponseInspector getResponseInspector() {
68                  return responseInspector;
69              }
70          };
71          this.timeoutScheduler = timeoutScheduler;
72      }
73  
74      public RequestResponseFilter(
75              ResponseInspectorFactory responseInspectorFactory,
76              ScheduledExecutorService timeoutScheduler) {
77          if (responseInspectorFactory == null) {
78              throw new NullPointerException("responseInspectorFactory");
79          }
80          if (timeoutScheduler == null) {
81              throw new NullPointerException("timeoutScheduler");
82          }
83          this.responseInspectorFactory = responseInspectorFactory;
84          this.timeoutScheduler = timeoutScheduler;
85      }
86  
87      @Override
88      public void onPreAdd(IoFilterChain parent, String name,
89              NextFilter nextFilter) throws Exception {
90          if (parent.contains(this)) {
91              throw new IllegalArgumentException(
92                      "You can't add the same filter instance more than once.  Create another instance and add it.");
93          }
94  
95          IoSession session = parent.getSession();
96          session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory
97                  .getResponseInspector());
98          session.setAttribute(REQUEST_STORE, createRequestStore(session));
99          session.setAttribute(UNRESPONDED_REQUEST_STORE, createUnrespondedRequestStore(session));
100     }
101 
102     @Override
103     public void onPostRemove(IoFilterChain parent, String name,
104             NextFilter nextFilter) throws Exception {
105         IoSession session = parent.getSession();
106 
107         destroyUnrespondedRequestStore(getUnrespondedRequestStore(session));
108         destroyRequestStore(getRequestStore(session));
109 
110         session.removeAttribute(UNRESPONDED_REQUEST_STORE);
111         session.removeAttribute(REQUEST_STORE);
112         session.removeAttribute(RESPONSE_INSPECTOR);
113     }
114 
115     @Override
116     public void messageReceived(NextFilter nextFilter, IoSession session,
117             Object message) throws Exception {
118         ResponseInspector responseInspector = (ResponseInspector) session
119                 .getAttribute(RESPONSE_INSPECTOR);
120         Object requestId = responseInspector.getRequestId(message);
121         if (requestId == null) {
122             // Not a response message.  Ignore.
123             nextFilter.messageReceived(session, message);
124             return;
125         }
126 
127         // Retrieve (or remove) the corresponding request.
128         ResponseType type = responseInspector.getResponseType(message);
129         if (type == null) {
130             nextFilter.exceptionCaught(session, new IllegalStateException(
131                     responseInspector.getClass().getName()
132                             + "#getResponseType() may not return null."));
133         }
134 
135         Map<Object, Request> requestStore = getRequestStore(session);
136 
137         Request request;
138         switch (type) {
139         case WHOLE:
140         case PARTIAL_LAST:
141             synchronized (requestStore) {
142                 request = requestStore.remove(requestId);
143             }
144             break;
145         case PARTIAL:
146             synchronized (requestStore) {
147                 request = requestStore.get(requestId);
148             }
149             break;
150         default:
151             throw new InternalError();
152         }
153 
154         if (request == null) {
155             // A response message without request. Swallow the event because
156             // the response might have arrived too late.
157             if (logger.isWarnEnabled()) {
158                 logger.warn("Unknown request ID '" + requestId
159                         + "' for the response message. Timed out already?: "
160                         + message);
161             }
162         } else {
163             // Found a matching request.
164             // Cancel the timeout task if needed.
165             if (type != ResponseType.PARTIAL) {
166                 ScheduledFuture<?> scheduledFuture = request.getTimeoutFuture();
167                 if (scheduledFuture != null) {
168                     scheduledFuture.cancel(false);
169                     Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
170                     synchronized (unrespondedRequests) {
171                         unrespondedRequests.remove(request);
172                     }
173                 }
174             }
175 
176             // And forward the event.
177             Response response = new Response(request, message, type);
178             request.signal(response);
179             nextFilter.messageReceived(session, response);
180         }
181     }
182 
183     @Override
184     protected Object doFilterWrite(
185             final NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
186         Object message = writeRequest.getMessage();
187         if (!(message instanceof Request)) {
188             return null;
189         }
190 
191         final Request request = (Request) message;
192         if (request.getTimeoutFuture() != null) {
193             throw new IllegalArgumentException("Request can not be reused.");
194         }
195 
196         Map<Object, Request> requestStore = getRequestStore(session);
197         Object oldValue = null;
198         Object requestId = request.getId();
199         synchronized (requestStore) {
200             oldValue = requestStore.get(requestId);
201             if (oldValue == null) {
202                 requestStore.put(requestId, request);
203             }
204         }
205         if (oldValue != null) {
206             throw new IllegalStateException(
207                     "Duplicate request ID: " + request.getId());
208         }
209 
210         // Schedule a task to be executed on timeout.
211         TimeoutTask timeoutTask = new TimeoutTask(
212                 nextFilter, request, session);
213         ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
214                 timeoutTask, request.getTimeoutMillis(),
215                 TimeUnit.MILLISECONDS);
216         request.setTimeoutTask(timeoutTask);
217         request.setTimeoutFuture(timeoutFuture);
218 
219         // Add the timeout task to the unfinished task set.
220         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
221         synchronized (unrespondedRequests) {
222             unrespondedRequests.add(request);
223         }
224 
225         return request.getMessage();
226     }
227 
228     @Override
229     public void sessionClosed(NextFilter nextFilter, IoSession session)
230             throws Exception {
231         // Copy the unfinished task set to avoid unnecessary lock acquisition.
232         // Copying will be cheap because there won't be that many requests queued.
233         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
234         List<Request> unrespondedRequestsCopy;
235         synchronized (unrespondedRequests) {
236             unrespondedRequestsCopy = new ArrayList<Request>(
237                     unrespondedRequests);
238             unrespondedRequests.clear();
239         }
240 
241         // Generate timeout artificially.
242         for (Request r : unrespondedRequestsCopy) {
243             if (r.getTimeoutFuture().cancel(false)) {
244                 r.getTimeoutTask().run();
245             }
246         }
247 
248         // Clear the request store just in case we missed something, though it's unlikely.
249         Map<Object, Request> requestStore = getRequestStore(session);
250         synchronized (requestStore) {
251             requestStore.clear();
252         }
253 
254         // Now tell the main subject.
255         nextFilter.sessionClosed(session);
256     }
257 
258     @SuppressWarnings("unchecked")
259     private Map<Object, Request> getRequestStore(IoSession session) {
260         return (Map<Object, Request>) session.getAttribute(REQUEST_STORE);
261     }
262 
263     @SuppressWarnings("unchecked")
264     private Set<Request> getUnrespondedRequestStore(IoSession session) {
265         return (Set<Request>) session.getAttribute(UNRESPONDED_REQUEST_STORE);
266     }
267 
268     /**
269      * Returns a {@link Map} which stores {@code messageId}-{@link Request}
270      * pairs whose {@link Response}s are not received yet.  Please override
271      * this method if you need to use other {@link Map} implementation
272      * than the default one ({@link HashMap}).
273      */
274     protected Map<Object, Request> createRequestStore(
275             IoSession session) {
276         return new HashMap<Object, Request>();
277     }
278 
279     /**
280      * Returns a {@link Set} which stores {@link Request} whose
281      * {@link Response}s are not received yet. Please override
282      * this method if you need to use other {@link Set} implementation
283      * than the default one ({@link LinkedHashSet}).  Please note that
284      * the {@link Iterator} of the returned {@link Set} have to iterate
285      * its elements in the insertion order to ensure that
286      * {@link RequestTimeoutException}s are thrown in the order which
287      * {@link Request}s were written.  If you don't need to guarantee
288      * the order of thrown exceptions, any {@link Set} implementation
289      * can be used.
290      */
291     protected Set<Request> createUnrespondedRequestStore(
292             IoSession session) {
293         return new LinkedHashSet<Request>();
294     }
295 
296     /**
297      * Releases any resources related with the {@link Map} created by
298      * {@link #createRequestStore(IoSession)}.  This method is useful
299      * if you override {@link #createRequestStore(IoSession)}.
300      *
301      * @param requestStore what you returned in {@link #createRequestStore(IoSession)}
302      */
303     protected void destroyRequestStore(
304             Map<Object, Request> requestStore) {
305     }
306 
307     /**
308      * Releases any resources related with the {@link Set} created by
309      * {@link #createUnrespondedRequestStore(IoSession)}.  This method is
310      * useful if you override {@link #createUnrespondedRequestStore(IoSession)}.
311      *
312      * @param unrespondedRequestStore what you returned in {@link #createUnrespondedRequestStore(IoSession)}
313      */
314     protected void destroyUnrespondedRequestStore(
315             Set<Request> unrespondedRequestStore) {
316     }
317 
318     private class TimeoutTask implements Runnable {
319         private final NextFilter filter;
320 
321         private final Request request;
322 
323         private final IoSession session;
324 
325         private TimeoutTask(NextFilter filter, Request request,
326                 IoSession session) {
327             this.filter = filter;
328             this.request = request;
329             this.session = session;
330         }
331 
332         public void run() {
333             Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
334             if (unrespondedRequests != null) {
335                 synchronized (unrespondedRequests) {
336                     unrespondedRequests.remove(request);
337                 }
338             }
339 
340             Map<Object, Request> requestStore = getRequestStore(session);
341             Object requestId = request.getId();
342             boolean timedOut;
343             synchronized (requestStore) {
344                 if (requestStore.get(requestId) == request) {
345                     requestStore.remove(requestId);
346                     timedOut = true;
347                 } else {
348                     timedOut = false;
349                 }
350             }
351 
352             if (timedOut) {
353                 // Throw the exception only when it's really timed out.
354                 RequestTimeoutException e = new RequestTimeoutException(request);
355                 request.signal(e);
356                 filter.exceptionCaught(session, e);
357             }
358         }
359     }
360 }