View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *  http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.geronimo.gshell.whisper.request;
21  
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.concurrent.ScheduledExecutorService;
25  import java.util.concurrent.ScheduledFuture;
26  import java.util.concurrent.ScheduledThreadPoolExecutor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.geronimo.gshell.common.Duration;
32  import org.apache.geronimo.gshell.common.NamedThreadFactory;
33  import org.apache.geronimo.gshell.common.tostring.ToStringBuilder;
34  import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
35  import org.apache.geronimo.gshell.whisper.message.Message;
36  import org.apache.geronimo.gshell.whisper.session.SessionAttributeBinder;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  /**
41   * Manages request state on a per-session basis, handles timeouts and signalling responses.
42   *
43   * @version $Rev: 580691 $ $Date: 2007-09-30 03:36:37 -0700 (Sun, 30 Sep 2007) $
44   */
45  public class RequestManager
46  {
47      public static final SessionAttributeBinder<RequestManager> BINDER = new SessionAttributeBinder<RequestManager>(RequestManager.class);
48  
49      private final Logger log = LoggerFactory.getLogger(getClass());
50  
51      private final Map<Message.ID,Registration> registrations = new HashMap<Message.ID, Registration>();
52  
53      private final ScheduledExecutorService scheduler;
54  
55      //
56      // TODO: Use a better locking scheme...
57      //
58      
59      private final Lock lock = new ReentrantLock();
60  
61      public RequestManager() {
62          ThreadFactory tf = new NamedThreadFactory(getClass());
63          
64          scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, tf);
65      }
66  
67      private Registration get(final Message.ID id) {
68          assert id != null;
69  
70          Registration reg = registrations.get(id);
71  
72          if (reg == null) {
73              throw new NotRegisteredException(id);
74          }
75  
76          return reg;
77      }
78  
79      private Registration remove(final Message.ID id) {
80          assert id != null;
81  
82          Registration reg = registrations.remove(id);
83  
84          if (reg == null) {
85              throw new NotRegisteredException(id);
86          }
87  
88          return reg;
89      }
90  
91      public boolean contains(final Message.ID id) {
92          lock.lock();
93  
94          try {
95              return registrations.containsKey(id);
96          }
97          finally {
98              lock.unlock();
99          }
100     }
101 
102     public void register(final RequestHandle request) {
103         assert request != null;
104 
105         lock.lock();
106 
107         try {
108             Message.ID id = request.getId();
109 
110             if (registrations.containsKey(id)) {
111                 throw new DuplicateRegistrationException(id);
112             }
113 
114             Registration reg = new Registration(request);
115 
116             registrations.put(id, reg);
117 
118             log.debug("Registered: {}", reg);
119         }
120         finally {
121             lock.unlock();
122         }
123     }
124 
125     public RequestHandle lookup(final Message.ID id) {
126         assert id != null;
127 
128         lock.lock();
129 
130         try {
131             Registration reg = get(id);
132 
133             return reg.request;
134         }
135         finally {
136             lock.unlock();
137         }
138     }
139 
140     public RequestHandle deregister(final Message.ID id) {
141         assert id != null;
142 
143         lock.lock();
144 
145         try {
146             Registration reg = remove(id);
147 
148             reg.deactivate();
149 
150             log.debug("Deregistered: {}", reg);
151 
152             return reg.request;
153         }
154         finally {
155             lock.unlock();
156         }
157     }
158 
159     public void activate(final Message.ID id) {
160         assert id != null;
161 
162         lock.lock();
163 
164         try {
165             Registration reg = get(id);
166             
167             reg.activate();
168 
169             log.debug("Activated: {}", reg);
170         }
171         catch (NotRegisteredException e) {
172             // Sometimes we receive responses to requests faster than we can register them
173             log.debug("Ignoring activation; request not registered: {}", id);
174         }
175         finally {
176             lock.unlock();
177         }
178     }
179 
180     public void deactivate(final Message.ID id) {
181         assert id != null;
182 
183         lock.lock();
184 
185         try {
186             Registration reg = get(id);
187 
188             reg.deactivate();
189 
190             log.debug("Deactivated: {}", reg);
191         }
192         catch (NotRegisteredException e) {
193             log.debug("Ignoring deactivation; request not registered: {}", id);
194         }
195         finally {
196             lock.unlock();
197         }
198     }
199 
200     private void timeout(final Message.ID id) {
201         assert id != null;
202 
203         lock.lock();
204 
205         try {
206             Registration reg = remove(id);
207 
208             reg.timeout();
209 
210             log.debug("Timed out: {}", reg);
211         }
212         catch (NotRegisteredException e) {
213             log.debug("Ignoring timeout; request not registered: {}", id);
214         }
215         catch (TimeoutAbortedException e) {
216             log.debug("Timeout aborted: " + e.getMessage());
217         }
218         finally {
219             lock.unlock();
220         }
221     }
222 
223     public void close() {
224         lock.lock();
225 
226         try {
227             if (!registrations.isEmpty()) {
228                 log.warn("Timing out remaining {} registrations", registrations.size());
229 
230                 for (Registration reg : registrations.values()) {
231                     timeout(reg.request.getId());
232                 }
233             }
234 
235             //
236             // FIXME: This causes some problems when a rsh client closes, like:
237             //
238             //        java.security.AccessControlException: access denied (java.lang.RuntimePermission modifyThread)
239             //
240             // scheduler.shutdown();
241         }
242         finally {
243             lock.unlock();
244         }
245     }
246 
247     private enum RegistrationState
248     {
249         PENDING,
250         ACTIVE,
251         DEACTIVE,
252         TIMEDOUT
253     }
254 
255     private class Registration
256     {
257         public final RequestHandle request;
258 
259         public RegistrationState state = RegistrationState.PENDING;
260 
261         private ScheduledFuture<?> timeoutFuture;
262 
263         public Registration(final RequestHandle request) {
264             assert request != null;
265 
266             this.request = request;
267         }
268 
269         public void activate() {
270             if (state != RegistrationState.PENDING) {
271                 log.debug("Can not activate, state is not PENDING, found: {}", state);
272             }
273             else {
274                 Runnable task = new Runnable() {
275                     public void run() {
276                         RequestManager.this.timeout(request.getId());
277                     }
278                 };
279 
280                 Duration timeout = request.getTimeout();
281 
282                 log.debug("Scheduling timeout to trigger in: {}", timeout);
283 
284                 timeoutFuture = scheduler.schedule(task, timeout.getValue(), timeout.getUnit());
285 
286                 state = RegistrationState.ACTIVE;
287             }
288         }
289 
290         public void deactivate() {
291             if (state != RegistrationState.ACTIVE) {
292                 log.debug("Can not deactivate; state is not ACTIVE, found: {}", state);
293             }
294             else if (timeoutFuture.cancel(false)) {
295                 timeoutFuture = null;
296 
297                 state = RegistrationState.DEACTIVE;
298             }
299             else {
300                 log.warn("Unable to cancel registration timeout: {}", this);
301             }
302         }
303 
304         public void timeout() {
305             Message.ID id = request.getId();
306 
307             if (timeoutFuture.isCancelled()) {
308                 throw new TimeoutAbortedException("Timeout has been canceled: " + id);
309             }
310             else if (request.isSignaled()) {
311                 throw new TimeoutAbortedException("Request has been singled: " + id);
312             }
313             else {
314                 request.timeout();
315 
316                 state = RegistrationState.TIMEDOUT;
317             }
318         }
319 
320         public String toString() {
321             return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
322                     .append("id", request.getId())
323                     .append("state", state)
324                     .toString();
325         }
326     }
327 
328     public class NotRegisteredException
329         extends RequestException
330     {
331         public NotRegisteredException(final Message.ID id) {
332             super(id);
333         }
334     }
335 
336     public class DuplicateRegistrationException
337         extends RequestException
338     {
339         public DuplicateRegistrationException(final Message.ID id) {
340             super(id);
341         }
342     }
343 
344     public class TimeoutAbortedException
345         extends RequestException
346     {
347         public TimeoutAbortedException(final String msg) {
348             super(msg);
349         }
350     }
351 }