1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.net.SocketException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.SocketChannel;
37 import java.util.Deque;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.concurrent.atomic.AtomicReference;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.io.Closer;
46 import org.apache.hc.core5.util.Args;
47 import org.apache.hc.core5.util.Timeout;
48
49 class IOSessionImpl implements IOSession {
50
51
52 private final static AtomicLong COUNT = new AtomicLong(0);
53
54 private final SelectionKey key;
55 private final SocketChannel channel;
56 private final Deque<Command> commandQueue;
57 private final Lock lock;
58 private final String id;
59 private final AtomicReference<IOEventHandler> handlerRef;
60 private final AtomicReference<IOSession.Status> status;
61
62 private volatile Timeout socketTimeout;
63 private volatile long lastReadTime;
64 private volatile long lastWriteTime;
65 private volatile long lastEventTime;
66
67 public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
68 super();
69 this.key = Args.notNull(key, "Selection key");
70 this.channel = Args.notNull(socketChannel, "Socket channel");
71 this.commandQueue = new ConcurrentLinkedDeque<>();
72 this.lock = new ReentrantLock();
73 this.socketTimeout = Timeout.DISABLED;
74 this.id = String.format(type + "-%010d", COUNT.getAndIncrement());
75 this.handlerRef = new AtomicReference<>();
76 this.status = new AtomicReference<>(Status.ACTIVE);
77 final long currentTimeMillis = System.currentTimeMillis();
78 this.lastReadTime = currentTimeMillis;
79 this.lastWriteTime = currentTimeMillis;
80 this.lastEventTime = currentTimeMillis;
81 }
82
83 @Override
84 public String getId() {
85 return id;
86 }
87
88 @Override
89 public IOEventHandler getHandler() {
90 return handlerRef.get();
91 }
92
93 @Override
94 public void upgrade(final IOEventHandler handler) {
95 handlerRef.set(handler);
96 }
97
98 @Override
99 public Lock getLock() {
100 return lock;
101 }
102
103 @Override
104 public void enqueue(final Command command, final Command.Priority priority) {
105 if (priority == Command.Priority.IMMEDIATE) {
106 commandQueue.addFirst(command);
107 } else {
108 commandQueue.add(command);
109 }
110 setEvent(SelectionKey.OP_WRITE);
111 }
112
113 @Override
114 public boolean hasCommands() {
115 return !commandQueue.isEmpty();
116 }
117
118 @Override
119 public Command poll() {
120 return commandQueue.poll();
121 }
122
123 @Override
124 public ByteChannel channel() {
125 return this.channel;
126 }
127
128 @Override
129 public SocketAddress getLocalAddress() {
130 return this.channel.socket().getLocalSocketAddress();
131 }
132
133 @Override
134 public SocketAddress getRemoteAddress() {
135 return this.channel.socket().getRemoteSocketAddress();
136 }
137
138 @Override
139 public int getEventMask() {
140 return this.key.interestOps();
141 }
142
143 @Override
144 public void setEventMask(final int newValue) {
145 lock.lock();
146 try {
147 if (isStatusClosed()) {
148 return;
149 }
150 this.key.interestOps(newValue);
151 } finally {
152 lock.unlock();
153 }
154 this.key.selector().wakeup();
155 }
156
157 @Override
158 public void setEvent(final int op) {
159 lock.lock();
160 try {
161 if (isStatusClosed()) {
162 return;
163 }
164 this.key.interestOps(this.key.interestOps() | op);
165 } finally {
166 lock.unlock();
167 }
168 this.key.selector().wakeup();
169 }
170
171 @Override
172 public void clearEvent(final int op) {
173 lock.lock();
174 try {
175 if (isStatusClosed()) {
176 return;
177 }
178 this.key.interestOps(this.key.interestOps() & ~op);
179 } finally {
180 lock.unlock();
181 }
182 this.key.selector().wakeup();
183 }
184
185 @Override
186 public Timeout getSocketTimeout() {
187 return this.socketTimeout;
188 }
189
190 @Override
191 public void setSocketTimeout(final Timeout timeout) {
192 this.socketTimeout = Timeout.defaultsToDisabled(timeout);
193 this.lastEventTime = System.currentTimeMillis();
194 }
195
196 @Override
197 public int read(final ByteBuffer dst) throws IOException {
198 return this.channel.read(dst);
199 }
200
201 @Override
202 public int write(final ByteBuffer src) throws IOException {
203 return this.channel.write(src);
204 }
205
206 @Override
207 public void updateReadTime() {
208 lastReadTime = System.currentTimeMillis();
209 lastEventTime = lastReadTime;
210 }
211
212 @Override
213 public void updateWriteTime() {
214 lastWriteTime = System.currentTimeMillis();
215 lastEventTime = lastWriteTime;
216 }
217
218 @Override
219 public long getLastReadTime() {
220 return lastReadTime;
221 }
222
223 @Override
224 public long getLastWriteTime() {
225 return lastWriteTime;
226 }
227
228 @Override
229 public long getLastEventTime() {
230 return lastEventTime;
231 }
232
233 @Override
234 public Status getStatus() {
235 return this.status.get();
236 }
237
238 private boolean isStatusClosed() {
239 return this.status.get() == Status.CLOSED;
240 }
241
242 @Override
243 public boolean isOpen() {
244 return this.status.get() == Status.ACTIVE && this.channel.isOpen();
245 }
246
247 @Override
248 public void close() {
249 close(CloseMode.GRACEFUL);
250 }
251
252 @Override
253 public void close(final CloseMode closeMode) {
254 if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
255 if (closeMode == CloseMode.IMMEDIATE) {
256 try {
257 this.channel.socket().setSoLinger(true, 0);
258 } catch (final SocketException e) {
259
260 }
261 }
262 this.key.cancel();
263 this.key.attach(null);
264 Closer.closeQuietly(this.key.channel());
265 if (this.key.selector().isOpen()) {
266 this.key.selector().wakeup();
267 }
268 }
269 }
270
271 private static void formatOps(final StringBuilder buffer, final int ops) {
272 if ((ops & SelectionKey.OP_READ) > 0) {
273 buffer.append('r');
274 }
275 if ((ops & SelectionKey.OP_WRITE) > 0) {
276 buffer.append('w');
277 }
278 if ((ops & SelectionKey.OP_ACCEPT) > 0) {
279 buffer.append('a');
280 }
281 if ((ops & SelectionKey.OP_CONNECT) > 0) {
282 buffer.append('c');
283 }
284 }
285
286 @Override
287 public String toString() {
288 final StringBuilder buffer = new StringBuilder();
289 buffer.append(id).append("[");
290 buffer.append(this.status);
291 buffer.append("][");
292 if (this.key.isValid()) {
293 formatOps(buffer, this.key.interestOps());
294 buffer.append(":");
295 formatOps(buffer, this.key.readyOps());
296 }
297 buffer.append("]");
298 return buffer.toString();
299 }
300
301 }