1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.net.SocketException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.SocketChannel;
37 import java.util.Deque;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.concurrent.atomic.AtomicReference;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.io.Closer;
46 import org.apache.hc.core5.util.Args;
47 import org.apache.hc.core5.util.Timeout;
48
49 class IOSessionImpl implements IOSession {
50
51
52 private final static AtomicLong COUNT = new AtomicLong(0);
53
54 private final SelectionKey key;
55 private final SocketChannel channel;
56 private final Deque<Command> commandQueue;
57 private final Lock lock;
58 private final String id;
59 private final AtomicReference<IOEventHandler> handlerRef;
60 private final AtomicReference<IOSession.Status> status;
61
62 private volatile Timeout socketTimeout;
63 private volatile long lastReadTime;
64 private volatile long lastWriteTime;
65 private volatile long lastEventTime;
66
67 public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
68 super();
69 this.key = Args.notNull(key, "Selection key");
70 this.channel = Args.notNull(socketChannel, "Socket channel");
71 this.commandQueue = new ConcurrentLinkedDeque<>();
72 this.lock = new ReentrantLock();
73 this.socketTimeout = Timeout.DISABLED;
74 this.id = String.format(type + "-%010d", COUNT.getAndIncrement());
75 this.handlerRef = new AtomicReference<>();
76 this.status = new AtomicReference<>(Status.ACTIVE);
77 final long currentTimeMillis = System.currentTimeMillis();
78 this.lastReadTime = currentTimeMillis;
79 this.lastWriteTime = currentTimeMillis;
80 this.lastEventTime = currentTimeMillis;
81 }
82
83 @Override
84 public String getId() {
85 return id;
86 }
87
88 @Override
89 public IOEventHandler getHandler() {
90 return handlerRef.get();
91 }
92
93 @Override
94 public void upgrade(final IOEventHandler handler) {
95 handlerRef.set(handler);
96 }
97
98 @Override
99 public Lock getLock() {
100 return lock;
101 }
102
103 @Override
104 public void enqueue(final Command command, final Command.Priority priority) {
105 if (priority == Command.Priority.IMMEDIATE) {
106 commandQueue.addFirst(command);
107 } else {
108 commandQueue.add(command);
109 }
110 setEvent(SelectionKey.OP_WRITE);
111
112 if (isStatusClosed()) {
113 command.cancel();
114 }
115 }
116
117 @Override
118 public boolean hasCommands() {
119 return !commandQueue.isEmpty();
120 }
121
122 @Override
123 public Command poll() {
124 return commandQueue.poll();
125 }
126
127 @Override
128 public ByteChannel channel() {
129 return this.channel;
130 }
131
132 @Override
133 public SocketAddress getLocalAddress() {
134 return this.channel.socket().getLocalSocketAddress();
135 }
136
137 @Override
138 public SocketAddress getRemoteAddress() {
139 return this.channel.socket().getRemoteSocketAddress();
140 }
141
142 @Override
143 public int getEventMask() {
144 return this.key.interestOps();
145 }
146
147 @Override
148 public void setEventMask(final int newValue) {
149 lock.lock();
150 try {
151 if (isStatusClosed()) {
152 return;
153 }
154 this.key.interestOps(newValue);
155 } finally {
156 lock.unlock();
157 }
158 this.key.selector().wakeup();
159 }
160
161 @Override
162 public void setEvent(final int op) {
163 lock.lock();
164 try {
165 if (isStatusClosed()) {
166 return;
167 }
168 this.key.interestOps(this.key.interestOps() | op);
169 } finally {
170 lock.unlock();
171 }
172 this.key.selector().wakeup();
173 }
174
175 @Override
176 public void clearEvent(final int op) {
177 lock.lock();
178 try {
179 if (isStatusClosed()) {
180 return;
181 }
182 this.key.interestOps(this.key.interestOps() & ~op);
183 } finally {
184 lock.unlock();
185 }
186 this.key.selector().wakeup();
187 }
188
189 @Override
190 public Timeout getSocketTimeout() {
191 return this.socketTimeout;
192 }
193
194 @Override
195 public void setSocketTimeout(final Timeout timeout) {
196 this.socketTimeout = Timeout.defaultsToDisabled(timeout);
197 this.lastEventTime = System.currentTimeMillis();
198 }
199
200 @Override
201 public int read(final ByteBuffer dst) throws IOException {
202 return this.channel.read(dst);
203 }
204
205 @Override
206 public int write(final ByteBuffer src) throws IOException {
207 return this.channel.write(src);
208 }
209
210 @Override
211 public void updateReadTime() {
212 lastReadTime = System.currentTimeMillis();
213 lastEventTime = lastReadTime;
214 }
215
216 @Override
217 public void updateWriteTime() {
218 lastWriteTime = System.currentTimeMillis();
219 lastEventTime = lastWriteTime;
220 }
221
222 @Override
223 public long getLastReadTime() {
224 return lastReadTime;
225 }
226
227 @Override
228 public long getLastWriteTime() {
229 return lastWriteTime;
230 }
231
232 @Override
233 public long getLastEventTime() {
234 return lastEventTime;
235 }
236
237 @Override
238 public Status getStatus() {
239 return this.status.get();
240 }
241
242 private boolean isStatusClosed() {
243 return this.status.get() == Status.CLOSED;
244 }
245
246 @Override
247 public boolean isOpen() {
248 return this.status.get() == Status.ACTIVE && this.channel.isOpen();
249 }
250
251 @Override
252 public void close() {
253 close(CloseMode.GRACEFUL);
254 }
255
256 @Override
257 public void close(final CloseMode closeMode) {
258 if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
259 if (closeMode == CloseMode.IMMEDIATE) {
260 try {
261 this.channel.socket().setSoLinger(true, 0);
262 } catch (final SocketException e) {
263
264 }
265 }
266 this.key.cancel();
267 this.key.attach(null);
268 Closer.closeQuietly(this.key.channel());
269 if (this.key.selector().isOpen()) {
270 this.key.selector().wakeup();
271 }
272 }
273 }
274
275 private static void formatOps(final StringBuilder buffer, final int ops) {
276 if ((ops & SelectionKey.OP_READ) > 0) {
277 buffer.append('r');
278 }
279 if ((ops & SelectionKey.OP_WRITE) > 0) {
280 buffer.append('w');
281 }
282 if ((ops & SelectionKey.OP_ACCEPT) > 0) {
283 buffer.append('a');
284 }
285 if ((ops & SelectionKey.OP_CONNECT) > 0) {
286 buffer.append('c');
287 }
288 }
289
290 @Override
291 public String toString() {
292 final StringBuilder buffer = new StringBuilder();
293 buffer.append(id).append("[");
294 buffer.append(this.status);
295 buffer.append("][");
296 if (this.key.isValid()) {
297 formatOps(buffer, this.key.interestOps());
298 buffer.append(":");
299 formatOps(buffer, this.key.readyOps());
300 }
301 buffer.append("]");
302 return buffer.toString();
303 }
304
305 }