1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.geronimo.gshell.whisper.request;
21
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.geronimo.gshell.common.Duration;
32 import org.apache.geronimo.gshell.common.NamedThreadFactory;
33 import org.apache.geronimo.gshell.common.tostring.ToStringBuilder;
34 import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
35 import org.apache.geronimo.gshell.whisper.message.Message;
36 import org.apache.geronimo.gshell.whisper.session.SessionAttributeBinder;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41 * Manages request state on a per-session basis, handles timeouts and signalling responses.
42 *
43 * @version $Rev: 580691 $ $Date: 2007-09-30 03:36:37 -0700 (Sun, 30 Sep 2007) $
44 */
45 public class RequestManager
46 {
47 public static final SessionAttributeBinder<RequestManager> BINDER = new SessionAttributeBinder<RequestManager>(RequestManager.class);
48
49 private final Logger log = LoggerFactory.getLogger(getClass());
50
51 private final Map<Message.ID,Registration> registrations = new HashMap<Message.ID, Registration>();
52
53 private final ScheduledExecutorService scheduler;
54
55
56
57
58
59 private final Lock lock = new ReentrantLock();
60
61 public RequestManager() {
62 ThreadFactory tf = new NamedThreadFactory(getClass());
63
64 scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, tf);
65 }
66
67 private Registration get(final Message.ID id) {
68 assert id != null;
69
70 Registration reg = registrations.get(id);
71
72 if (reg == null) {
73 throw new NotRegisteredException(id);
74 }
75
76 return reg;
77 }
78
79 private Registration remove(final Message.ID id) {
80 assert id != null;
81
82 Registration reg = registrations.remove(id);
83
84 if (reg == null) {
85 throw new NotRegisteredException(id);
86 }
87
88 return reg;
89 }
90
91 public boolean contains(final Message.ID id) {
92 lock.lock();
93
94 try {
95 return registrations.containsKey(id);
96 }
97 finally {
98 lock.unlock();
99 }
100 }
101
102 public void register(final RequestHandle request) {
103 assert request != null;
104
105 lock.lock();
106
107 try {
108 Message.ID id = request.getId();
109
110 if (registrations.containsKey(id)) {
111 throw new DuplicateRegistrationException(id);
112 }
113
114 Registration reg = new Registration(request);
115
116 registrations.put(id, reg);
117
118 log.debug("Registered: {}", reg);
119 }
120 finally {
121 lock.unlock();
122 }
123 }
124
125 public RequestHandle lookup(final Message.ID id) {
126 assert id != null;
127
128 lock.lock();
129
130 try {
131 Registration reg = get(id);
132
133 return reg.request;
134 }
135 finally {
136 lock.unlock();
137 }
138 }
139
140 public RequestHandle deregister(final Message.ID id) {
141 assert id != null;
142
143 lock.lock();
144
145 try {
146 Registration reg = remove(id);
147
148 reg.deactivate();
149
150 log.debug("Deregistered: {}", reg);
151
152 return reg.request;
153 }
154 finally {
155 lock.unlock();
156 }
157 }
158
159 public void activate(final Message.ID id) {
160 assert id != null;
161
162 lock.lock();
163
164 try {
165 Registration reg = get(id);
166
167 reg.activate();
168
169 log.debug("Activated: {}", reg);
170 }
171 catch (NotRegisteredException e) {
172
173 log.debug("Ignoring activation; request not registered: {}", id);
174 }
175 finally {
176 lock.unlock();
177 }
178 }
179
180 public void deactivate(final Message.ID id) {
181 assert id != null;
182
183 lock.lock();
184
185 try {
186 Registration reg = get(id);
187
188 reg.deactivate();
189
190 log.debug("Deactivated: {}", reg);
191 }
192 catch (NotRegisteredException e) {
193 log.debug("Ignoring deactivation; request not registered: {}", id);
194 }
195 finally {
196 lock.unlock();
197 }
198 }
199
200 private void timeout(final Message.ID id) {
201 assert id != null;
202
203 lock.lock();
204
205 try {
206 Registration reg = remove(id);
207
208 reg.timeout();
209
210 log.debug("Timed out: {}", reg);
211 }
212 catch (NotRegisteredException e) {
213 log.debug("Ignoring timeout; request not registered: {}", id);
214 }
215 catch (TimeoutAbortedException e) {
216 log.debug("Timeout aborted: " + e.getMessage());
217 }
218 finally {
219 lock.unlock();
220 }
221 }
222
223 public void close() {
224 lock.lock();
225
226 try {
227 if (!registrations.isEmpty()) {
228 log.warn("Timing out remaining {} registrations", registrations.size());
229
230 for (Registration reg : registrations.values()) {
231 timeout(reg.request.getId());
232 }
233 }
234
235
236
237
238
239
240
241 }
242 finally {
243 lock.unlock();
244 }
245 }
246
247 private enum RegistrationState
248 {
249 PENDING,
250 ACTIVE,
251 DEACTIVE,
252 TIMEDOUT
253 }
254
255 private class Registration
256 {
257 public final RequestHandle request;
258
259 public RegistrationState state = RegistrationState.PENDING;
260
261 private ScheduledFuture<?> timeoutFuture;
262
263 public Registration(final RequestHandle request) {
264 assert request != null;
265
266 this.request = request;
267 }
268
269 public void activate() {
270 if (state != RegistrationState.PENDING) {
271 log.debug("Can not activate, state is not PENDING, found: {}", state);
272 }
273 else {
274 Runnable task = new Runnable() {
275 public void run() {
276 RequestManager.this.timeout(request.getId());
277 }
278 };
279
280 Duration timeout = request.getTimeout();
281
282 log.debug("Scheduling timeout to trigger in: {}", timeout);
283
284 timeoutFuture = scheduler.schedule(task, timeout.getValue(), timeout.getUnit());
285
286 state = RegistrationState.ACTIVE;
287 }
288 }
289
290 public void deactivate() {
291 if (state != RegistrationState.ACTIVE) {
292 log.debug("Can not deactivate; state is not ACTIVE, found: {}", state);
293 }
294 else if (timeoutFuture.cancel(false)) {
295 timeoutFuture = null;
296
297 state = RegistrationState.DEACTIVE;
298 }
299 else {
300 log.warn("Unable to cancel registration timeout: {}", this);
301 }
302 }
303
304 public void timeout() {
305 Message.ID id = request.getId();
306
307 if (timeoutFuture.isCancelled()) {
308 throw new TimeoutAbortedException("Timeout has been canceled: " + id);
309 }
310 else if (request.isSignaled()) {
311 throw new TimeoutAbortedException("Request has been singled: " + id);
312 }
313 else {
314 request.timeout();
315
316 state = RegistrationState.TIMEDOUT;
317 }
318 }
319
320 public String toString() {
321 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
322 .append("id", request.getId())
323 .append("state", state)
324 .toString();
325 }
326 }
327
328 public class NotRegisteredException
329 extends RequestException
330 {
331 public NotRegisteredException(final Message.ID id) {
332 super(id);
333 }
334 }
335
336 public class DuplicateRegistrationException
337 extends RequestException
338 {
339 public DuplicateRegistrationException(final Message.ID id) {
340 super(id);
341 }
342 }
343
344 public class TimeoutAbortedException
345 extends RequestException
346 {
347 public TimeoutAbortedException(final String msg) {
348 super(msg);
349 }
350 }
351 }