View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.util.Queue;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicReference;
38  import java.util.concurrent.locks.Lock;
39  
40  import javax.net.ssl.SSLContext;
41  
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.function.Decorator;
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.net.NamedEndpoint;
46  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
47  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
48  import org.apache.hc.core5.reactor.ssl.SSLMode;
49  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
50  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
51  import org.apache.hc.core5.reactor.ssl.TlsDetails;
52  import org.apache.hc.core5.util.Asserts;
53  import org.apache.hc.core5.util.Timeout;
54  
55  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
56  
57      private final IOSession ioSession;
58      private final NamedEndpoint initialEndpoint;
59      private final Decorator<IOSession> ioSessionDecorator;
60      private final IOSessionListener sessionListener;
61      private final Queue<InternalDataChannel> closedSessions;
62      private final AtomicReference<SSLIOSession> tlsSessionRef;
63      private final AtomicReference<IOSession> currentSessionRef;
64      private final AtomicBoolean closed;
65  
66      InternalDataChannel(
67              final IOSession ioSession,
68              final NamedEndpoint initialEndpoint,
69              final Decorator<IOSession> ioSessionDecorator,
70              final IOSessionListener sessionListener,
71              final Queue<InternalDataChannel> closedSessions) {
72          this.ioSession = ioSession;
73          this.initialEndpoint = initialEndpoint;
74          this.closedSessions = closedSessions;
75          this.ioSessionDecorator = ioSessionDecorator;
76          this.sessionListener = sessionListener;
77          this.tlsSessionRef = new AtomicReference<>(null);
78          this.currentSessionRef = new AtomicReference<>(
79                  ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
80          this.closed = new AtomicBoolean(false);
81      }
82  
83      @Override
84      public String getId() {
85          return ioSession.getId();
86      }
87  
88      @Override
89      public NamedEndpoint getInitialEndpoint() {
90          return initialEndpoint;
91      }
92  
93      @Override
94      public IOEventHandler getHandler() {
95          final IOSession currentSession = currentSessionRef.get();
96          return currentSession.getHandler();
97      }
98  
99      @Override
100     public void upgrade(final IOEventHandler handler) {
101         final IOSession currentSession = currentSessionRef.get();
102         currentSession.upgrade(handler);
103     }
104 
105     private IOEventHandler ensureHandler(final IOSession session) {
106         final IOEventHandler handler = session.getHandler();
107         Asserts.notNull(handler, "IO event handler");
108         return handler;
109     }
110 
111     @Override
112     void onIOEvent(final int readyOps) throws IOException {
113         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
114             final IOSession currentSession = currentSessionRef.get();
115             currentSession.clearEvent(SelectionKey.OP_CONNECT);
116             if (tlsSessionRef.get() == null) {
117                 if (sessionListener != null) {
118                     sessionListener.connected(currentSession);
119                 }
120                 final IOEventHandler handler = ensureHandler(currentSession);
121                 handler.connected(currentSession);
122             }
123         }
124         if ((readyOps & SelectionKey.OP_READ) != 0) {
125             final IOSession currentSession = currentSessionRef.get();
126             currentSession.updateReadTime();
127             if (sessionListener != null) {
128                 sessionListener.inputReady(currentSession);
129             }
130             final IOEventHandler handler = ensureHandler(currentSession);
131             handler.inputReady(currentSession, null);
132         }
133         if ((readyOps & SelectionKey.OP_WRITE) != 0
134                 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
135             final IOSession currentSession = currentSessionRef.get();
136             currentSession.updateWriteTime();
137             if (sessionListener != null) {
138                 sessionListener.outputReady(currentSession);
139             }
140             final IOEventHandler handler = ensureHandler(currentSession);
141             handler.outputReady(currentSession);
142         }
143     }
144 
145     @Override
146     Timeout getTimeout() {
147         final IOSession currentSession = currentSessionRef.get();
148         return currentSession.getSocketTimeout();
149     }
150 
151     @Override
152     void onTimeout(final Timeout timeout) throws IOException {
153         final IOSession currentSession = currentSessionRef.get();
154         if (sessionListener != null) {
155             sessionListener.timeout(currentSession);
156         }
157         final IOEventHandler handler = ensureHandler(currentSession);
158         handler.timeout(currentSession, timeout);
159     }
160 
161     @Override
162     void onException(final Exception cause) {
163         final IOSession currentSession = currentSessionRef.get();
164         if (sessionListener != null) {
165             sessionListener.exception(currentSession, cause);
166         }
167         final IOEventHandler handler = currentSession.getHandler();
168         if (handler != null) {
169             handler.exception(currentSession, cause);
170         }
171     }
172 
173     void onTLSSessionStart(final SSLIOSession sslSession) {
174         final IOSession currentSession = currentSessionRef.get();
175         if (sessionListener != null) {
176             sessionListener.connected(currentSession);
177         }
178     }
179 
180     void onTLSSessionEnd() {
181         if (closed.compareAndSet(false, true)) {
182             closedSessions.add(this);
183         }
184     }
185 
186     void disconnected() {
187         final IOSession currentSession = currentSessionRef.get();
188         if (sessionListener != null) {
189             sessionListener.disconnected(currentSession);
190         }
191         final IOEventHandler handler = currentSession.getHandler();
192         if (handler != null) {
193             handler.disconnected(currentSession);
194         }
195     }
196 
197     @Override
198     public void startTls(
199             final SSLContext sslContext,
200             final NamedEndpoint endpoint,
201             final SSLBufferMode sslBufferMode,
202             final SSLSessionInitializer initializer,
203             final SSLSessionVerifier verifier,
204             final Timeout handshakeTimeout) {
205         final SSLIOSessionSession.html#SSLIOSession">SSLIOSession sslioSession = new SSLIOSession(
206                 endpoint != null ? endpoint : initialEndpoint,
207                 ioSession,
208                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
209                 sslContext,
210                 sslBufferMode,
211                 initializer,
212                 verifier,
213                 new Callback<SSLIOSession>() {
214 
215                     @Override
216                     public void execute(final SSLIOSession sslSession) {
217                         onTLSSessionStart(sslSession);
218                     }
219 
220                 },
221                 new Callback<SSLIOSession>() {
222 
223                     @Override
224                     public void execute(final SSLIOSession sslSession) {
225                         onTLSSessionEnd();
226                     }
227 
228                 },
229                 handshakeTimeout);
230         if (tlsSessionRef.compareAndSet(null, sslioSession)) {
231             currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
232             if (sessionListener != null) {
233                 sessionListener.startTls(sslioSession);
234             }
235         } else {
236             throw new IllegalStateException("TLS already activated");
237         }
238     }
239 
240     @SuppressWarnings("resource")
241     @Override
242     public TlsDetails getTlsDetails() {
243         final SSLIOSession sslIoSession = tlsSessionRef.get();
244         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
245     }
246 
247     @Override
248     public Lock getLock() {
249         return ioSession.getLock();
250     }
251 
252     @Override
253     public void close() {
254         close(CloseMode.GRACEFUL);
255     }
256 
257     @Override
258     public void close(final CloseMode closeMode) {
259         final IOSession currentSession = currentSessionRef.get();
260         if (closeMode == CloseMode.IMMEDIATE) {
261             closed.set(true);
262             currentSession.close(closeMode);
263         } else {
264             if (closed.compareAndSet(false, true)) {
265                 try {
266                     currentSession.close(closeMode);
267                 } finally {
268                     closedSessions.add(this);
269                 }
270             }
271         }
272     }
273 
274     @Override
275     public IOSession.Status getStatus() {
276         final IOSession currentSession = currentSessionRef.get();
277         return currentSession.getStatus();
278     }
279 
280     @Override
281     public boolean isOpen() {
282         final IOSession currentSession = currentSessionRef.get();
283         return currentSession.isOpen();
284     }
285 
286     @Override
287     public void enqueue(final Command command, final Command.Priority priority) {
288         final IOSession currentSession = currentSessionRef.get();
289         currentSession.enqueue(command, priority);
290     }
291 
292     @Override
293     public boolean hasCommands() {
294         final IOSession currentSession = currentSessionRef.get();
295         return currentSession.hasCommands();
296     }
297 
298     @Override
299     public Command poll() {
300         final IOSession currentSession = currentSessionRef.get();
301         return currentSession.poll();
302     }
303 
304     @Override
305     public ByteChannel channel() {
306         final IOSession currentSession = currentSessionRef.get();
307         return currentSession.channel();
308     }
309 
310     @Override
311     public SocketAddress getRemoteAddress() {
312         return ioSession.getRemoteAddress();
313     }
314 
315     @Override
316     public SocketAddress getLocalAddress() {
317         return ioSession.getLocalAddress();
318     }
319 
320     @Override
321     public int getEventMask() {
322         final IOSession currentSession = currentSessionRef.get();
323         return currentSession.getEventMask();
324     }
325 
326     @Override
327     public void setEventMask(final int ops) {
328         final IOSession currentSession = currentSessionRef.get();
329         currentSession.setEventMask(ops);
330     }
331 
332     @Override
333     public void setEvent(final int op) {
334         final IOSession currentSession = currentSessionRef.get();
335         currentSession.setEvent(op);
336     }
337 
338     @Override
339     public void clearEvent(final int op) {
340         final IOSession currentSession = currentSessionRef.get();
341         currentSession.clearEvent(op);
342     }
343 
344     @Override
345     public Timeout getSocketTimeout() {
346         return ioSession.getSocketTimeout();
347     }
348 
349     @Override
350     public void setSocketTimeout(final Timeout timeout) {
351         ioSession.setSocketTimeout(timeout);
352     }
353 
354     @Override
355     public int read(final ByteBuffer dst) throws IOException {
356         final IOSession currentSession = currentSessionRef.get();
357         return currentSession.read(dst);
358     }
359 
360     @Override
361     public int write(final ByteBuffer src) throws IOException {
362         final IOSession currentSession = currentSessionRef.get();
363         return currentSession.write(src);
364     }
365 
366     @Override
367     public void updateReadTime() {
368         ioSession.updateReadTime();
369     }
370 
371     @Override
372     public void updateWriteTime() {
373         ioSession.updateWriteTime();
374     }
375 
376     @Override
377     public long getLastReadTime() {
378         return ioSession.getLastReadTime();
379     }
380 
381     @Override
382     public long getLastWriteTime() {
383         return ioSession.getLastWriteTime();
384     }
385 
386     @Override
387     public long getLastEventTime() {
388         return ioSession.getLastEventTime();
389     }
390 
391     @Override
392     public String toString() {
393         final IOSession currentSession = currentSessionRef.get();
394         if (currentSession != null) {
395             return currentSession.toString();
396         } else {
397             return ioSession.toString();
398         }
399     }
400 
401 }