View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.ConcurrentMap;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicReference;
40  import java.util.concurrent.locks.Lock;
41  
42  import javax.net.ssl.SSLContext;
43  import javax.net.ssl.SSLSession;
44  
45  import org.apache.hc.core5.concurrent.CallbackContribution;
46  import org.apache.hc.core5.concurrent.FutureCallback;
47  import org.apache.hc.core5.function.Decorator;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.net.NamedEndpoint;
50  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
51  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
52  import org.apache.hc.core5.reactor.ssl.SSLMode;
53  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
54  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
55  import org.apache.hc.core5.reactor.ssl.TlsDetails;
56  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
57  import org.apache.hc.core5.util.Args;
58  import org.apache.hc.core5.util.Asserts;
59  import org.apache.hc.core5.util.TextUtils;
60  import org.apache.hc.core5.util.Timeout;
61  
62  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
63  
64      private final IOSession ioSession;
65      private final NamedEndpoint initialEndpoint;
66      private final Decorator<IOSession> ioSessionDecorator;
67      private final IOSessionListener sessionListener;
68      private final Queue<InternalDataChannel> closedSessions;
69      private final AtomicReference<SSLIOSession> tlsSessionRef;
70      private final AtomicReference<IOSession> currentSessionRef;
71      private final AtomicReference<IOEventHandler> eventHandlerRef;
72      private final ConcurrentMap<String, ProtocolUpgradeHandler> protocolUpgradeHandlerMap;
73      private final AtomicBoolean closed;
74  
75      InternalDataChannel(
76              final IOSession ioSession,
77              final NamedEndpoint initialEndpoint,
78              final Decorator<IOSession> ioSessionDecorator,
79              final IOSessionListener sessionListener,
80              final Queue<InternalDataChannel> closedSessions) {
81          this.ioSession = ioSession;
82          this.initialEndpoint = initialEndpoint;
83          this.closedSessions = closedSessions;
84          this.ioSessionDecorator = ioSessionDecorator;
85          this.sessionListener = sessionListener;
86          this.tlsSessionRef = new AtomicReference<>();
87          this.currentSessionRef = new AtomicReference<>(
88                  ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
89          this.eventHandlerRef = new AtomicReference<>();
90          this.protocolUpgradeHandlerMap = new ConcurrentHashMap<>();
91          this.closed = new AtomicBoolean(false);
92      }
93  
94      @Override
95      public String getId() {
96          return ioSession.getId();
97      }
98  
99      @Override
100     public NamedEndpoint getInitialEndpoint() {
101         return initialEndpoint;
102     }
103 
104     @Override
105     public IOEventHandler getHandler() {
106         return eventHandlerRef.get();
107     }
108 
109     @Override
110     public void upgrade(final IOEventHandler handler) {
111         final IOSession currentSession = currentSessionRef.get();
112         currentSession.upgrade(handler);
113         eventHandlerRef.set(handler);
114     }
115 
116     private IOEventHandler ensureHandler(final IOSession session) {
117         final IOEventHandler handler = session.getHandler();
118         Asserts.notNull(handler, "IO event handler");
119         return handler;
120     }
121 
122     @Override
123     void onIOEvent(final int readyOps) throws IOException {
124         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
125             final IOSession currentSession = currentSessionRef.get();
126             currentSession.clearEvent(SelectionKey.OP_CONNECT);
127             if (tlsSessionRef.get() == null) {
128                 if (sessionListener != null) {
129                     sessionListener.connected(currentSession);
130                 }
131                 final IOEventHandler handler = ensureHandler(currentSession);
132                 handler.connected(currentSession);
133             }
134         }
135         if ((readyOps & SelectionKey.OP_READ) != 0) {
136             final IOSession currentSession = currentSessionRef.get();
137             currentSession.updateReadTime();
138             if (sessionListener != null) {
139                 sessionListener.inputReady(currentSession);
140             }
141             final IOEventHandler handler = ensureHandler(currentSession);
142             handler.inputReady(currentSession, null);
143         }
144         if ((readyOps & SelectionKey.OP_WRITE) != 0
145                 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
146             final IOSession currentSession = currentSessionRef.get();
147             currentSession.updateWriteTime();
148             if (sessionListener != null) {
149                 sessionListener.outputReady(currentSession);
150             }
151             final IOEventHandler handler = ensureHandler(currentSession);
152             handler.outputReady(currentSession);
153         }
154     }
155 
156     @Override
157     Timeout getTimeout() {
158         final IOSession currentSession = currentSessionRef.get();
159         return currentSession.getSocketTimeout();
160     }
161 
162     @Override
163     void onTimeout(final Timeout timeout) throws IOException {
164         final IOSession currentSession = currentSessionRef.get();
165         if (sessionListener != null) {
166             sessionListener.timeout(currentSession);
167         }
168         final IOEventHandler handler = ensureHandler(currentSession);
169         handler.timeout(currentSession, timeout);
170     }
171 
172     @Override
173     void onException(final Exception cause) {
174         final IOSession currentSession = currentSessionRef.get();
175         if (sessionListener != null) {
176             sessionListener.exception(currentSession, cause);
177         }
178         final IOEventHandler handler = currentSession.getHandler();
179         if (handler != null) {
180             handler.exception(currentSession, cause);
181         }
182     }
183 
184     void onTLSSessionStart(final SSLIOSession sslSession) {
185         final IOSession currentSession = currentSessionRef.get();
186         if (sessionListener != null) {
187             sessionListener.connected(currentSession);
188         }
189     }
190 
191     void onTLSSessionEnd(final SSLIOSession sslSession) {
192         if (closed.compareAndSet(false, true)) {
193             closedSessions.add(this);
194         }
195     }
196 
197     void disconnected() {
198         final IOSession currentSession = currentSessionRef.get();
199         if (sessionListener != null) {
200             sessionListener.disconnected(currentSession);
201         }
202         final IOEventHandler handler = currentSession.getHandler();
203         if (handler != null) {
204             handler.disconnected(currentSession);
205         }
206     }
207 
208     @Override
209     public void startTls(
210             final SSLContext sslContext,
211             final NamedEndpoint endpoint,
212             final SSLBufferMode sslBufferMode,
213             final SSLSessionInitializer initializer,
214             final SSLSessionVerifier verifier,
215             final Timeout handshakeTimeout) {
216         startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null);
217     }
218 
219     @Override
220     public void startTls(
221             final SSLContext sslContext,
222             final NamedEndpoint endpoint,
223             final SSLBufferMode sslBufferMode,
224             final SSLSessionInitializer initializer,
225             final SSLSessionVerifier verifier,
226             final Timeout handshakeTimeout,
227             final FutureCallback<TransportSecurityLayer> callback) {
228         final SSLIOSession sslioSession = new SSLIOSession(
229                 endpoint != null ? endpoint : initialEndpoint,
230                 ioSession,
231                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
232                 sslContext,
233                 sslBufferMode,
234                 initializer,
235                 verifier,
236                 handshakeTimeout,
237                 this::onTLSSessionStart,
238                 this::onTLSSessionEnd,
239                 new CallbackContribution<SSLSession>(callback) {
240 
241                     @Override
242                     public void completed(final SSLSession sslSession) {
243                         if (callback != null) {
244                             callback.completed(InternalDataChannel.this);
245                         }
246                     }
247 
248                 });
249         if (tlsSessionRef.compareAndSet(null, sslioSession)) {
250             currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
251         } else {
252             throw new IllegalStateException("TLS already activated");
253         }
254         try {
255             if (sessionListener != null) {
256                 sessionListener.startTls(sslioSession);
257             }
258             sslioSession.beginHandshake(this);
259         } catch (final Exception ex) {
260             onException(ex);
261         }
262     }
263 
264     @SuppressWarnings("resource")
265     @Override
266     public TlsDetails getTlsDetails() {
267         final SSLIOSession sslIoSession = tlsSessionRef.get();
268         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
269     }
270 
271     @Override
272     public Lock getLock() {
273         return ioSession.getLock();
274     }
275 
276     @Override
277     public void close() {
278         close(CloseMode.GRACEFUL);
279     }
280 
281     @Override
282     public void close(final CloseMode closeMode) {
283         final IOSession currentSession = currentSessionRef.get();
284         if (closeMode == CloseMode.IMMEDIATE) {
285             closed.set(true);
286             currentSession.close(closeMode);
287         } else {
288             if (closed.compareAndSet(false, true)) {
289                 try {
290                     currentSession.close(closeMode);
291                 } finally {
292                     closedSessions.add(this);
293                 }
294             }
295         }
296     }
297 
298     @Override
299     public IOSession.Status getStatus() {
300         final IOSession currentSession = currentSessionRef.get();
301         return currentSession.getStatus();
302     }
303 
304     @Override
305     public boolean isOpen() {
306         final IOSession currentSession = currentSessionRef.get();
307         return currentSession.isOpen();
308     }
309 
310     @Override
311     public void enqueue(final Command command, final Command.Priority priority) {
312         final IOSession currentSession = currentSessionRef.get();
313         currentSession.enqueue(command, priority);
314     }
315 
316     @Override
317     public boolean hasCommands() {
318         final IOSession currentSession = currentSessionRef.get();
319         return currentSession.hasCommands();
320     }
321 
322     @Override
323     public Command poll() {
324         final IOSession currentSession = currentSessionRef.get();
325         return currentSession.poll();
326     }
327 
328     @Override
329     public ByteChannel channel() {
330         final IOSession currentSession = currentSessionRef.get();
331         return currentSession.channel();
332     }
333 
334     @Override
335     public SocketAddress getRemoteAddress() {
336         return ioSession.getRemoteAddress();
337     }
338 
339     @Override
340     public SocketAddress getLocalAddress() {
341         return ioSession.getLocalAddress();
342     }
343 
344     @Override
345     public int getEventMask() {
346         final IOSession currentSession = currentSessionRef.get();
347         return currentSession.getEventMask();
348     }
349 
350     @Override
351     public void setEventMask(final int ops) {
352         final IOSession currentSession = currentSessionRef.get();
353         currentSession.setEventMask(ops);
354     }
355 
356     @Override
357     public void setEvent(final int op) {
358         final IOSession currentSession = currentSessionRef.get();
359         currentSession.setEvent(op);
360     }
361 
362     @Override
363     public void clearEvent(final int op) {
364         final IOSession currentSession = currentSessionRef.get();
365         currentSession.clearEvent(op);
366     }
367 
368     @Override
369     public Timeout getSocketTimeout() {
370         return ioSession.getSocketTimeout();
371     }
372 
373     @Override
374     public void setSocketTimeout(final Timeout timeout) {
375         ioSession.setSocketTimeout(timeout);
376     }
377 
378     @Override
379     public int read(final ByteBuffer dst) throws IOException {
380         final IOSession currentSession = currentSessionRef.get();
381         return currentSession.read(dst);
382     }
383 
384     @Override
385     public int write(final ByteBuffer src) throws IOException {
386         final IOSession currentSession = currentSessionRef.get();
387         return currentSession.write(src);
388     }
389 
390     @Override
391     public void updateReadTime() {
392         ioSession.updateReadTime();
393     }
394 
395     @Override
396     public void updateWriteTime() {
397         ioSession.updateWriteTime();
398     }
399 
400     @Override
401     public long getLastReadTime() {
402         return ioSession.getLastReadTime();
403     }
404 
405     @Override
406     public long getLastWriteTime() {
407         return ioSession.getLastWriteTime();
408     }
409 
410     @Override
411     public long getLastEventTime() {
412         return ioSession.getLastEventTime();
413     }
414 
415     @Override
416     public void switchProtocol(final String protocolId, final FutureCallback<ProtocolIOSession> callback) {
417         Args.notEmpty(protocolId, "Application protocol ID");
418         final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerMap.get(TextUtils.toLowerCase(protocolId));
419         if (upgradeHandler != null) {
420             upgradeHandler.upgrade(this, callback);
421         } else {
422             throw new IllegalStateException("Unsupported protocol: " + protocolId);
423         }
424     }
425 
426     @Override
427     public void registerProtocol(final String protocolId, final ProtocolUpgradeHandler upgradeHandler) {
428         Args.notEmpty(protocolId, "Application protocol ID");
429         Args.notNull(upgradeHandler, "Protocol upgrade handler");
430         protocolUpgradeHandlerMap.put(TextUtils.toLowerCase(protocolId), upgradeHandler);
431     }
432 
433     @Override
434     public String toString() {
435         final IOSession currentSession = currentSessionRef.get();
436         if (currentSession != null) {
437             return currentSession.toString();
438         } else {
439             return ioSession.toString();
440         }
441     }
442 
443 }