1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.nio.channels.ByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.SocketChannel;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.Map;
40
41 import org.apache.http.annotation.Contract;
42 import org.apache.http.annotation.ThreadingBehavior;
43 import org.apache.http.nio.reactor.IOSession;
44 import org.apache.http.nio.reactor.SessionBufferStatus;
45 import org.apache.http.nio.reactor.SocketAccessor;
46 import org.apache.http.util.Args;
47
48
49
50
51
52
53 @Contract(threading = ThreadingBehavior.SAFE)
54 public class IOSessionImpl implements IOSession, SocketAccessor {
55
56 private final SelectionKey key;
57 private final ByteChannel channel;
58 private final Map<String, Object> attributes;
59 private final InterestOpsCallback interestOpsCallback;
60 private final SessionClosedCallback sessionClosedCallback;
61
62 private volatile int status;
63 private volatile int currentEventMask;
64 private volatile SessionBufferStatus bufferStatus;
65 private volatile int socketTimeout;
66
67 private final long startedTime;
68
69 private volatile long lastReadTime;
70 private volatile long lastWriteTime;
71 private volatile long lastAccessTime;
72
73
74
75
76
77
78
79
80
81
82 public IOSessionImpl(
83 final SelectionKey key,
84 final InterestOpsCallback interestOpsCallback,
85 final SessionClosedCallback sessionClosedCallback) {
86 super();
87 Args.notNull(key, "Selection key");
88 this.key = key;
89 this.channel = (ByteChannel) this.key.channel();
90 this.interestOpsCallback = interestOpsCallback;
91 this.sessionClosedCallback = sessionClosedCallback;
92 this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
93 this.currentEventMask = key.interestOps();
94 this.socketTimeout = 0;
95 this.status = ACTIVE;
96 final long now = System.currentTimeMillis();
97 this.startedTime = now;
98 this.lastReadTime = now;
99 this.lastWriteTime = now;
100 this.lastAccessTime = now;
101 }
102
103
104
105
106
107
108
109 public IOSessionImpl(
110 final SelectionKey key,
111 final SessionClosedCallback sessionClosedCallback) {
112 this(key, null, sessionClosedCallback);
113 }
114
115 @Override
116 public ByteChannel channel() {
117 return this.channel;
118 }
119
120 @Override
121 public SocketAddress getLocalAddress() {
122 return this.channel instanceof SocketChannel
123 ? ((SocketChannel) this.channel).socket().getLocalSocketAddress()
124 : null;
125 }
126
127 @Override
128 public SocketAddress getRemoteAddress() {
129 return this.channel instanceof SocketChannel
130 ? ((SocketChannel) this.channel).socket().getRemoteSocketAddress()
131 : null;
132 }
133
134 @Override
135 public int getEventMask() {
136 return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
137 }
138
139 @Override
140 public synchronized void setEventMask(final int ops) {
141 if (this.status == CLOSED) {
142 return;
143 }
144 if (this.interestOpsCallback != null) {
145
146 this.currentEventMask = ops;
147
148
149 final InterestOpEntryor/InterestOpEntry.html#InterestOpEntry">InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
150
151
152 this.interestOpsCallback.addInterestOps(entry);
153 } else {
154 this.key.interestOps(ops);
155 }
156 this.key.selector().wakeup();
157 }
158
159 @Override
160 public synchronized void setEvent(final int op) {
161 if (this.status == CLOSED) {
162 return;
163 }
164 if (this.interestOpsCallback != null) {
165
166 this.currentEventMask |= op;
167
168
169 final InterestOpEntryor/InterestOpEntry.html#InterestOpEntry">InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
170
171
172 this.interestOpsCallback.addInterestOps(entry);
173 } else {
174 final int ops = this.key.interestOps();
175 this.key.interestOps(ops | op);
176 }
177 this.key.selector().wakeup();
178 }
179
180 @Override
181 public synchronized void clearEvent(final int op) {
182 if (this.status == CLOSED) {
183 return;
184 }
185 if (this.interestOpsCallback != null) {
186
187 this.currentEventMask &= ~op;
188
189
190 final InterestOpEntryor/InterestOpEntry.html#InterestOpEntry">InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
191
192
193 this.interestOpsCallback.addInterestOps(entry);
194 } else {
195 final int ops = this.key.interestOps();
196 this.key.interestOps(ops & ~op);
197 }
198 this.key.selector().wakeup();
199 }
200
201 @Override
202 public int getSocketTimeout() {
203 return this.socketTimeout;
204 }
205
206 @Override
207 public void setSocketTimeout(final int timeout) {
208 this.socketTimeout = timeout;
209 this.lastAccessTime = System.currentTimeMillis();
210 }
211
212 @Override
213 public void close() {
214 synchronized (this) {
215 if (this.status == CLOSED) {
216 return;
217 }
218 this.status = CLOSED;
219 this.key.cancel();
220 try {
221 this.key.channel().close();
222 } catch (final IOException ex) {
223
224
225 }
226 if (this.sessionClosedCallback != null) {
227 this.sessionClosedCallback.sessionClosed(this);
228 }
229 if (this.key.selector().isOpen()) {
230 this.key.selector().wakeup();
231 }
232 }
233 }
234
235 @Override
236 public int getStatus() {
237 return this.status;
238 }
239
240 @Override
241 public boolean isClosed() {
242 return this.status == CLOSED;
243 }
244
245 @Override
246 public void shutdown() {
247
248
249 close();
250 }
251
252 @Override
253 public boolean hasBufferedInput() {
254 final SessionBufferStatus buffStatus = this.bufferStatus;
255 return buffStatus != null && buffStatus.hasBufferedInput();
256 }
257
258 @Override
259 public boolean hasBufferedOutput() {
260 final SessionBufferStatus buffStatus = this.bufferStatus;
261 return buffStatus != null && buffStatus.hasBufferedOutput();
262 }
263
264 @Override
265 public void setBufferStatus(final SessionBufferStatus bufferStatus) {
266 this.bufferStatus = bufferStatus;
267 }
268
269 @Override
270 public Object getAttribute(final String name) {
271 return this.attributes.get(name);
272 }
273
274 @Override
275 public Object removeAttribute(final String name) {
276 return this.attributes.remove(name);
277 }
278
279 @Override
280 public void setAttribute(final String name, final Object obj) {
281 this.attributes.put(name, obj);
282 }
283
284 public long getStartedTime() {
285 return this.startedTime;
286 }
287
288 public long getLastReadTime() {
289 return this.lastReadTime;
290 }
291
292 public long getLastWriteTime() {
293 return this.lastWriteTime;
294 }
295
296 public long getLastAccessTime() {
297 return this.lastAccessTime;
298 }
299
300 void resetLastRead() {
301 final long now = System.currentTimeMillis();
302 this.lastReadTime = now;
303 this.lastAccessTime = now;
304 }
305
306 void resetLastWrite() {
307 final long now = System.currentTimeMillis();
308 this.lastWriteTime = now;
309 this.lastAccessTime = now;
310 }
311
312 private static void formatOps(final StringBuilder buffer, final int ops) {
313 if ((ops & SelectionKey.OP_READ) > 0) {
314 buffer.append('r');
315 }
316 if ((ops & SelectionKey.OP_WRITE) > 0) {
317 buffer.append('w');
318 }
319 if ((ops & SelectionKey.OP_ACCEPT) > 0) {
320 buffer.append('a');
321 }
322 if ((ops & SelectionKey.OP_CONNECT) > 0) {
323 buffer.append('c');
324 }
325 }
326
327 private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
328 if (socketAddress instanceof InetSocketAddress) {
329 final InetSocketAddress addr = ((InetSocketAddress) socketAddress);
330 buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
331 addr.getAddress())
332 .append(':')
333 .append(addr.getPort());
334 } else {
335 buffer.append(socketAddress);
336 }
337 }
338
339 @Override
340 public String toString() {
341 final StringBuilder buffer = new StringBuilder();
342 synchronized (this.key) {
343 final SocketAddress remoteAddress = getRemoteAddress();
344 final SocketAddress localAddress = getLocalAddress();
345 if (remoteAddress != null && localAddress != null) {
346 formatAddress(buffer, localAddress);
347 buffer.append("<->");
348 formatAddress(buffer, remoteAddress);
349 }
350 buffer.append('[');
351 switch (this.status) {
352 case ACTIVE:
353 buffer.append("ACTIVE");
354 break;
355 case CLOSING:
356 buffer.append("CLOSING");
357 break;
358 case CLOSED:
359 buffer.append("CLOSED");
360 break;
361 }
362 buffer.append("][");
363 if (this.key.isValid()) {
364 formatOps(buffer, this.interestOpsCallback != null ?
365 this.currentEventMask : this.key.interestOps());
366 buffer.append(':');
367 formatOps(buffer, this.key.readyOps());
368 }
369 }
370 buffer.append(']');
371 return new String(buffer);
372 }
373
374 @Override
375 public Socket getSocket() {
376 return this.channel instanceof SocketChannel ? ((SocketChannel) this.channel).socket() : null;
377 }
378
379 }