1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.reqres;
21
22 import java.util.NoSuchElementException;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27
28
29
30
31
32
33 public class Request {
34 private final Object id;
35
36 private final Object message;
37
38 private final long timeoutMillis;
39
40 private volatile Runnable timeoutTask;
41
42 private volatile ScheduledFuture<?> timeoutFuture;
43
44 private final BlockingQueue<Object> responses;
45
46 private volatile boolean endOfResponses;
47
48 public Request(Object id, Object message, long timeoutMillis) {
49 this(id, message, true, timeoutMillis);
50 }
51
52 public Request(Object id, Object message, boolean useResponseQueue,
53 long timeoutMillis) {
54 this(id, message, useResponseQueue, timeoutMillis,
55 TimeUnit.MILLISECONDS);
56 }
57
58 public Request(Object id, Object message, long timeout, TimeUnit unit) {
59 this(id, message, true, timeout, unit);
60 }
61
62 public Request(Object id, Object message, boolean useResponseQueue,
63 long timeout, TimeUnit unit) {
64 if (id == null) {
65 throw new NullPointerException("id");
66 }
67 if (message == null) {
68 throw new NullPointerException("message");
69 }
70 if (timeout < 0) {
71 throw new IllegalArgumentException("timeout: " + timeout
72 + " (expected: 0+)");
73 } else if (timeout == 0) {
74 timeout = Long.MAX_VALUE;
75 }
76
77 if (unit == null) {
78 throw new NullPointerException("unit");
79 }
80
81 this.id = id;
82 this.message = message;
83 this.responses = useResponseQueue ? new LinkedBlockingQueue<Object>() : null;
84 this.timeoutMillis = unit.toMillis(timeout);
85 }
86
87 public Object getId() {
88 return id;
89 }
90
91 public Object getMessage() {
92 return message;
93 }
94
95 public long getTimeoutMillis() {
96 return timeoutMillis;
97 }
98
99 public boolean isUseResponseQueue() {
100 return responses != null;
101 }
102
103 public boolean hasResponse() {
104 checkUseResponseQueue();
105 return !responses.isEmpty();
106 }
107
108 public Response awaitResponse() throws RequestTimeoutException,
109 InterruptedException {
110 checkUseResponseQueue();
111 chechEndOfResponses();
112 return convertToResponse(responses.take());
113 }
114
115 public Response awaitResponse(long timeout, TimeUnit unit)
116 throws RequestTimeoutException, InterruptedException {
117 checkUseResponseQueue();
118 chechEndOfResponses();
119 return convertToResponse(responses.poll(timeout, unit));
120 }
121
122 private Response convertToResponse(Object o) {
123 if (o instanceof Response) {
124 return (Response) o;
125 }
126
127 if (o == null) {
128 return null;
129 }
130
131 throw (RequestTimeoutException) o;
132 }
133
134 public Response awaitResponseUninterruptibly()
135 throws RequestTimeoutException {
136 for (; ;) {
137 try {
138 return awaitResponse();
139 } catch (InterruptedException e) {
140
141 }
142 }
143 }
144
145 private void chechEndOfResponses() {
146 if (responses != null && endOfResponses && responses.isEmpty()) {
147 throw new NoSuchElementException(
148 "All responses has been retrieved already.");
149 }
150 }
151
152 private void checkUseResponseQueue() {
153 if (responses == null) {
154 throw new UnsupportedOperationException(
155 "Response queue is not available; useResponseQueue is false.");
156 }
157 }
158
159 void signal(Response response) {
160 signal0(response);
161 if (response.getType() != ResponseType.PARTIAL) {
162 endOfResponses = true;
163 }
164 }
165
166 void signal(RequestTimeoutException e) {
167 signal0(e);
168 endOfResponses = true;
169 }
170
171 private void signal0(Object answer) {
172 if (responses != null) {
173 responses.add(answer);
174 }
175 }
176
177 @Override
178 public int hashCode() {
179 return getId().hashCode();
180 }
181
182 @Override
183 public boolean equals(Object o) {
184 if (o == this) {
185 return true;
186 }
187
188 if (o == null) {
189 return false;
190 }
191
192 if (!(o instanceof Request)) {
193 return false;
194 }
195
196 Request that = (Request) o;
197 return this.getId().equals(that.getId());
198 }
199
200 @Override
201 public String toString() {
202 String timeout = getTimeoutMillis() == Long.MAX_VALUE ? "max"
203 : String.valueOf(getTimeoutMillis());
204
205 return "request: { id=" + getId() + ", timeout=" + timeout
206 + ", message=" + getMessage() + " }";
207 }
208
209 Runnable getTimeoutTask() {
210 return timeoutTask;
211 }
212
213 void setTimeoutTask(Runnable timeoutTask) {
214 this.timeoutTask = timeoutTask;
215 }
216
217 ScheduledFuture<?> getTimeoutFuture() {
218 return timeoutFuture;
219 }
220
221 void setTimeoutFuture(ScheduledFuture<?> timeoutFuture) {
222 this.timeoutFuture = timeoutFuture;
223 }
224 }