1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.reqres;
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.LinkedHashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
33
34 import org.apache.mina.core.filterchain.IoFilterChain;
35 import org.apache.mina.core.session.AttributeKey;
36 import org.apache.mina.core.session.IoSession;
37 import org.apache.mina.core.write.WriteRequest;
38 import org.apache.mina.filter.util.WriteRequestFilter;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46
47
48 public class RequestResponseFilter extends WriteRequestFilter {
49
50 private final AttributeKey RESPONSE_INSPECTOR = new AttributeKey(getClass(), "responseInspector");
51 private final AttributeKey REQUEST_STORE = new AttributeKey(getClass(), "requestStore");
52 private final AttributeKey UNRESPONDED_REQUEST_STORE = new AttributeKey(getClass(), "unrespondedRequestStore");
53
54 private final ResponseInspectorFactory responseInspectorFactory;
55 private final ScheduledExecutorService timeoutScheduler;
56
57 private final static Logger LOGGER = LoggerFactory.getLogger(RequestResponseFilter.class);
58
59 public RequestResponseFilter(final ResponseInspector responseInspector,
60 ScheduledExecutorService timeoutScheduler) {
61 if (responseInspector == null) {
62 throw new IllegalArgumentException("responseInspector");
63 }
64 if (timeoutScheduler == null) {
65 throw new IllegalArgumentException("timeoutScheduler");
66 }
67 this.responseInspectorFactory = new ResponseInspectorFactory() {
68 public ResponseInspector getResponseInspector() {
69 return responseInspector;
70 }
71 };
72 this.timeoutScheduler = timeoutScheduler;
73 }
74
75 public RequestResponseFilter(
76 ResponseInspectorFactory responseInspectorFactory,
77 ScheduledExecutorService timeoutScheduler) {
78 if (responseInspectorFactory == null) {
79 throw new IllegalArgumentException("responseInspectorFactory");
80 }
81 if (timeoutScheduler == null) {
82 throw new IllegalArgumentException("timeoutScheduler");
83 }
84 this.responseInspectorFactory = responseInspectorFactory;
85 this.timeoutScheduler = timeoutScheduler;
86 }
87
88 @Override
89 public void onPreAdd(IoFilterChain parent, String name,
90 NextFilter nextFilter) throws Exception {
91 if (parent.contains(this)) {
92 throw new IllegalArgumentException(
93 "You can't add the same filter instance more than once. Create another instance and add it.");
94 }
95
96 IoSession session = parent.getSession();
97 session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory
98 .getResponseInspector());
99 session.setAttribute(REQUEST_STORE, createRequestStore(session));
100 session.setAttribute(UNRESPONDED_REQUEST_STORE, createUnrespondedRequestStore(session));
101 }
102
103 @Override
104 public void onPostRemove(IoFilterChain parent, String name,
105 NextFilter nextFilter) throws Exception {
106 IoSession session = parent.getSession();
107
108 destroyUnrespondedRequestStore(getUnrespondedRequestStore(session));
109 destroyRequestStore(getRequestStore(session));
110
111 session.removeAttribute(UNRESPONDED_REQUEST_STORE);
112 session.removeAttribute(REQUEST_STORE);
113 session.removeAttribute(RESPONSE_INSPECTOR);
114 }
115
116 @Override
117 public void messageReceived(NextFilter nextFilter, IoSession session,
118 Object message) throws Exception {
119 ResponseInspector responseInspector = (ResponseInspector) session
120 .getAttribute(RESPONSE_INSPECTOR);
121 Object requestId = responseInspector.getRequestId(message);
122 if (requestId == null) {
123
124 nextFilter.messageReceived(session, message);
125 return;
126 }
127
128
129 ResponseType type = responseInspector.getResponseType(message);
130 if (type == null) {
131 nextFilter.exceptionCaught(session, new IllegalStateException(
132 responseInspector.getClass().getName()
133 + "#getResponseType() may not return null."));
134 }
135
136 Map<Object, Request> requestStore = getRequestStore(session);
137
138 Request request;
139 switch (type) {
140 case WHOLE:
141 case PARTIAL_LAST:
142 synchronized (requestStore) {
143 request = requestStore.remove(requestId);
144 }
145 break;
146 case PARTIAL:
147 synchronized (requestStore) {
148 request = requestStore.get(requestId);
149 }
150 break;
151 default:
152 throw new InternalError();
153 }
154
155 if (request == null) {
156
157
158 if (LOGGER.isWarnEnabled()) {
159 LOGGER.warn("Unknown request ID '" + requestId
160 + "' for the response message. Timed out already?: "
161 + message);
162 }
163 } else {
164
165
166 if (type != ResponseType.PARTIAL) {
167 ScheduledFuture<?> scheduledFuture = request.getTimeoutFuture();
168 if (scheduledFuture != null) {
169 scheduledFuture.cancel(false);
170 Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
171 synchronized (unrespondedRequests) {
172 unrespondedRequests.remove(request);
173 }
174 }
175 }
176
177
178 Response response = new Response(request, message, type);
179 request.signal(response);
180 nextFilter.messageReceived(session, response);
181 }
182 }
183
184 @Override
185 protected Object doFilterWrite(
186 final NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
187 Object message = writeRequest.getMessage();
188 if (!(message instanceof Request)) {
189 return null;
190 }
191
192 final Request request = (Request) message;
193 if (request.getTimeoutFuture() != null) {
194 throw new IllegalArgumentException("Request can not be reused.");
195 }
196
197 Map<Object, Request> requestStore = getRequestStore(session);
198 Object oldValue = null;
199 Object requestId = request.getId();
200 synchronized (requestStore) {
201 oldValue = requestStore.get(requestId);
202 if (oldValue == null) {
203 requestStore.put(requestId, request);
204 }
205 }
206 if (oldValue != null) {
207 throw new IllegalStateException(
208 "Duplicate request ID: " + request.getId());
209 }
210
211
212 TimeoutTask timeoutTask = new TimeoutTask(
213 nextFilter, request, session);
214 ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
215 timeoutTask, request.getTimeoutMillis(),
216 TimeUnit.MILLISECONDS);
217 request.setTimeoutTask(timeoutTask);
218 request.setTimeoutFuture(timeoutFuture);
219
220
221 Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
222 synchronized (unrespondedRequests) {
223 unrespondedRequests.add(request);
224 }
225
226 return request.getMessage();
227 }
228
229 @Override
230 public void sessionClosed(NextFilter nextFilter, IoSession session)
231 throws Exception {
232
233
234 Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
235 List<Request> unrespondedRequestsCopy;
236 synchronized (unrespondedRequests) {
237 unrespondedRequestsCopy = new ArrayList<Request>(
238 unrespondedRequests);
239 unrespondedRequests.clear();
240 }
241
242
243 for (Request r : unrespondedRequestsCopy) {
244 if (r.getTimeoutFuture().cancel(false)) {
245 r.getTimeoutTask().run();
246 }
247 }
248
249
250 Map<Object, Request> requestStore = getRequestStore(session);
251 synchronized (requestStore) {
252 requestStore.clear();
253 }
254
255
256 nextFilter.sessionClosed(session);
257 }
258
259 @SuppressWarnings("unchecked")
260 private Map<Object, Request> getRequestStore(IoSession session) {
261 return (Map<Object, Request>) session.getAttribute(REQUEST_STORE);
262 }
263
264 @SuppressWarnings("unchecked")
265 private Set<Request> getUnrespondedRequestStore(IoSession session) {
266 return (Set<Request>) session.getAttribute(UNRESPONDED_REQUEST_STORE);
267 }
268
269
270
271
272
273
274
275 protected Map<Object, Request> createRequestStore(
276 IoSession session) {
277 return new ConcurrentHashMap<Object, Request>();
278 }
279
280
281
282
283
284
285
286
287
288
289
290
291
292 protected Set<Request> createUnrespondedRequestStore(
293 IoSession session) {
294 return new LinkedHashSet<Request>();
295 }
296
297
298
299
300
301
302
303
304 protected void destroyRequestStore(
305 Map<Object, Request> requestStore) {
306
307 }
308
309
310
311
312
313
314
315
316 protected void destroyUnrespondedRequestStore(
317 Set<Request> unrespondedRequestStore) {
318
319 }
320
321 private class TimeoutTask implements Runnable {
322 private final NextFilter filter;
323
324 private final Request request;
325
326 private final IoSession session;
327
328 private TimeoutTask(NextFilter filter, Request request,
329 IoSession session) {
330 this.filter = filter;
331 this.request = request;
332 this.session = session;
333 }
334
335 public void run() {
336 Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
337 if (unrespondedRequests != null) {
338 synchronized (unrespondedRequests) {
339 unrespondedRequests.remove(request);
340 }
341 }
342
343 Map<Object, Request> requestStore = getRequestStore(session);
344 Object requestId = request.getId();
345 boolean timedOut;
346 synchronized (requestStore) {
347 if (requestStore.get(requestId) == request) {
348 requestStore.remove(requestId);
349 timedOut = true;
350 } else {
351 timedOut = false;
352 }
353 }
354
355 if (timedOut) {
356
357 RequestTimeoutException e = new RequestTimeoutException(request);
358 request.signal(e);
359 filter.exceptionCaught(session, e);
360 }
361 }
362 }
363 }