1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.util.Queue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.Lock;
39
40 import javax.net.ssl.SSLContext;
41
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.function.Decorator;
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.net.NamedEndpoint;
46 import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
47 import org.apache.hc.core5.reactor.ssl.SSLIOSession;
48 import org.apache.hc.core5.reactor.ssl.SSLMode;
49 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
50 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
51 import org.apache.hc.core5.reactor.ssl.TlsDetails;
52 import org.apache.hc.core5.util.Asserts;
53 import org.apache.hc.core5.util.Timeout;
54
55 final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
56
57 private final IOSession ioSession;
58 private final NamedEndpoint initialEndpoint;
59 private final Decorator<IOSession> ioSessionDecorator;
60 private final IOSessionListener sessionListener;
61 private final Queue<InternalDataChannel> closedSessions;
62 private final AtomicReference<SSLIOSession> tlsSessionRef;
63 private final AtomicReference<IOSession> currentSessionRef;
64 private final AtomicBoolean closed;
65
66 InternalDataChannel(
67 final IOSession ioSession,
68 final NamedEndpoint initialEndpoint,
69 final Decorator<IOSession> ioSessionDecorator,
70 final IOSessionListener sessionListener,
71 final Queue<InternalDataChannel> closedSessions) {
72 this.ioSession = ioSession;
73 this.initialEndpoint = initialEndpoint;
74 this.closedSessions = closedSessions;
75 this.ioSessionDecorator = ioSessionDecorator;
76 this.sessionListener = sessionListener;
77 this.tlsSessionRef = new AtomicReference<>(null);
78 this.currentSessionRef = new AtomicReference<>(
79 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
80 this.closed = new AtomicBoolean(false);
81 }
82
83 @Override
84 public String getId() {
85 return ioSession.getId();
86 }
87
88 @Override
89 public NamedEndpoint getInitialEndpoint() {
90 return initialEndpoint;
91 }
92
93 @Override
94 public IOEventHandler getHandler() {
95 final IOSession currentSession = currentSessionRef.get();
96 return currentSession.getHandler();
97 }
98
99 @Override
100 public void upgrade(final IOEventHandler handler) {
101 final IOSession currentSession = currentSessionRef.get();
102 currentSession.upgrade(handler);
103 }
104
105 private IOEventHandler ensureHandler(final IOSession session) {
106 final IOEventHandler handler = session.getHandler();
107 Asserts.notNull(handler, "IO event handler");
108 return handler;
109 }
110
111 @Override
112 void onIOEvent(final int readyOps) throws IOException {
113 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
114 final IOSession currentSession = currentSessionRef.get();
115 currentSession.clearEvent(SelectionKey.OP_CONNECT);
116 if (tlsSessionRef.get() == null) {
117 if (sessionListener != null) {
118 sessionListener.connected(currentSession);
119 }
120 final IOEventHandler handler = ensureHandler(currentSession);
121 handler.connected(currentSession);
122 }
123 }
124 if ((readyOps & SelectionKey.OP_READ) != 0) {
125 final IOSession currentSession = currentSessionRef.get();
126 currentSession.updateReadTime();
127 if (sessionListener != null) {
128 sessionListener.inputReady(currentSession);
129 }
130 final IOEventHandler handler = ensureHandler(currentSession);
131 handler.inputReady(currentSession, null);
132 }
133 if ((readyOps & SelectionKey.OP_WRITE) != 0
134 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
135 final IOSession currentSession = currentSessionRef.get();
136 currentSession.updateWriteTime();
137 if (sessionListener != null) {
138 sessionListener.outputReady(currentSession);
139 }
140 final IOEventHandler handler = ensureHandler(currentSession);
141 handler.outputReady(currentSession);
142 }
143 }
144
145 @Override
146 Timeout getTimeout() {
147 final IOSession currentSession = currentSessionRef.get();
148 return currentSession.getSocketTimeout();
149 }
150
151 @Override
152 void onTimeout(final Timeout timeout) throws IOException {
153 final IOSession currentSession = currentSessionRef.get();
154 if (sessionListener != null) {
155 sessionListener.timeout(currentSession);
156 }
157 final IOEventHandler handler = ensureHandler(currentSession);
158 handler.timeout(currentSession, timeout);
159 }
160
161 @Override
162 void onException(final Exception cause) {
163 final IOSession currentSession = currentSessionRef.get();
164 if (sessionListener != null) {
165 sessionListener.exception(currentSession, cause);
166 }
167 final IOEventHandler handler = currentSession.getHandler();
168 if (handler != null) {
169 handler.exception(currentSession, cause);
170 }
171 }
172
173 void onTLSSessionStart(final SSLIOSession sslSession) {
174 final IOSession currentSession = currentSessionRef.get();
175 if (sessionListener != null) {
176 sessionListener.connected(currentSession);
177 }
178 }
179
180 void onTLSSessionEnd() {
181 if (closed.compareAndSet(false, true)) {
182 closedSessions.add(this);
183 }
184 }
185
186 void disconnected() {
187 final IOSession currentSession = currentSessionRef.get();
188 if (sessionListener != null) {
189 sessionListener.disconnected(currentSession);
190 }
191 final IOEventHandler handler = currentSession.getHandler();
192 if (handler != null) {
193 handler.disconnected(currentSession);
194 }
195 }
196
197 @Override
198 public void startTls(
199 final SSLContext sslContext,
200 final NamedEndpoint endpoint,
201 final SSLBufferMode sslBufferMode,
202 final SSLSessionInitializer initializer,
203 final SSLSessionVerifier verifier,
204 final Timeout handshakeTimeout) {
205 final SSLIOSessionSession.html#SSLIOSession">SSLIOSession sslioSession = new SSLIOSession(
206 endpoint != null ? endpoint : initialEndpoint,
207 ioSession,
208 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
209 sslContext,
210 sslBufferMode,
211 initializer,
212 verifier,
213 new Callback<SSLIOSession>() {
214
215 @Override
216 public void execute(final SSLIOSession sslSession) {
217 onTLSSessionStart(sslSession);
218 }
219
220 },
221 new Callback<SSLIOSession>() {
222
223 @Override
224 public void execute(final SSLIOSession sslSession) {
225 onTLSSessionEnd();
226 }
227
228 },
229 handshakeTimeout);
230 if (tlsSessionRef.compareAndSet(null, sslioSession)) {
231 currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
232 if (sessionListener != null) {
233 sessionListener.startTls(sslioSession);
234 }
235 } else {
236 throw new IllegalStateException("TLS already activated");
237 }
238 }
239
240 @SuppressWarnings("resource")
241 @Override
242 public TlsDetails getTlsDetails() {
243 final SSLIOSession sslIoSession = tlsSessionRef.get();
244 return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
245 }
246
247 @Override
248 public Lock getLock() {
249 return ioSession.getLock();
250 }
251
252 @Override
253 public void close() {
254 close(CloseMode.GRACEFUL);
255 }
256
257 @Override
258 public void close(final CloseMode closeMode) {
259 final IOSession currentSession = currentSessionRef.get();
260 if (closeMode == CloseMode.IMMEDIATE) {
261 closed.set(true);
262 currentSession.close(closeMode);
263 } else {
264 if (closed.compareAndSet(false, true)) {
265 try {
266 currentSession.close(closeMode);
267 } finally {
268 closedSessions.add(this);
269 }
270 }
271 }
272 }
273
274 @Override
275 public IOSession.Status getStatus() {
276 final IOSession currentSession = currentSessionRef.get();
277 return currentSession.getStatus();
278 }
279
280 @Override
281 public boolean isOpen() {
282 final IOSession currentSession = currentSessionRef.get();
283 return currentSession.isOpen();
284 }
285
286 @Override
287 public void enqueue(final Command command, final Command.Priority priority) {
288 final IOSession currentSession = currentSessionRef.get();
289 currentSession.enqueue(command, priority);
290 }
291
292 @Override
293 public boolean hasCommands() {
294 final IOSession currentSession = currentSessionRef.get();
295 return currentSession.hasCommands();
296 }
297
298 @Override
299 public Command poll() {
300 final IOSession currentSession = currentSessionRef.get();
301 return currentSession.poll();
302 }
303
304 @Override
305 public ByteChannel channel() {
306 final IOSession currentSession = currentSessionRef.get();
307 return currentSession.channel();
308 }
309
310 @Override
311 public SocketAddress getRemoteAddress() {
312 return ioSession.getRemoteAddress();
313 }
314
315 @Override
316 public SocketAddress getLocalAddress() {
317 return ioSession.getLocalAddress();
318 }
319
320 @Override
321 public int getEventMask() {
322 final IOSession currentSession = currentSessionRef.get();
323 return currentSession.getEventMask();
324 }
325
326 @Override
327 public void setEventMask(final int ops) {
328 final IOSession currentSession = currentSessionRef.get();
329 currentSession.setEventMask(ops);
330 }
331
332 @Override
333 public void setEvent(final int op) {
334 final IOSession currentSession = currentSessionRef.get();
335 currentSession.setEvent(op);
336 }
337
338 @Override
339 public void clearEvent(final int op) {
340 final IOSession currentSession = currentSessionRef.get();
341 currentSession.clearEvent(op);
342 }
343
344 @Override
345 public Timeout getSocketTimeout() {
346 return ioSession.getSocketTimeout();
347 }
348
349 @Override
350 public void setSocketTimeout(final Timeout timeout) {
351 ioSession.setSocketTimeout(timeout);
352 }
353
354 @Override
355 public int read(final ByteBuffer dst) throws IOException {
356 final IOSession currentSession = currentSessionRef.get();
357 return currentSession.read(dst);
358 }
359
360 @Override
361 public int write(final ByteBuffer src) throws IOException {
362 final IOSession currentSession = currentSessionRef.get();
363 return currentSession.write(src);
364 }
365
366 @Override
367 public void updateReadTime() {
368 ioSession.updateReadTime();
369 }
370
371 @Override
372 public void updateWriteTime() {
373 ioSession.updateWriteTime();
374 }
375
376 @Override
377 public long getLastReadTime() {
378 return ioSession.getLastReadTime();
379 }
380
381 @Override
382 public long getLastWriteTime() {
383 return ioSession.getLastWriteTime();
384 }
385
386 @Override
387 public long getLastEventTime() {
388 return ioSession.getLastEventTime();
389 }
390
391 @Override
392 public String toString() {
393 final IOSession currentSession = currentSessionRef.get();
394 if (currentSession != null) {
395 return currentSession.toString();
396 } else {
397 return ioSession.toString();
398 }
399 }
400
401 }