View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashMap;
39  import java.util.Map;
40  
41  import org.apache.http.annotation.Contract;
42  import org.apache.http.annotation.ThreadingBehavior;
43  import org.apache.http.nio.reactor.IOSession;
44  import org.apache.http.nio.reactor.SessionBufferStatus;
45  import org.apache.http.nio.reactor.SocketAccessor;
46  import org.apache.http.util.Args;
47  
48  /**
49   * Default implementation of {@link IOSession}.
50   *
51   * @since 4.0
52   */
53  @Contract(threading = ThreadingBehavior.SAFE)
54  public class IOSessionImpl implements IOSession, SocketAccessor {
55  
56      private final SelectionKey key;
57      private final ByteChannel channel;
58      private final Map<String, Object> attributes;
59      private final InterestOpsCallback interestOpsCallback;
60      private final SessionClosedCallback sessionClosedCallback;
61  
62      private volatile int status;
63      private volatile int currentEventMask;
64      private volatile SessionBufferStatus bufferStatus;
65      private volatile int socketTimeout;
66  
67      private final long startedTime;
68  
69      private volatile long lastReadTime;
70      private volatile long lastWriteTime;
71      private volatile long lastAccessTime;
72  
73      /**
74       * Creates new instance of IOSessionImpl.
75       *
76       * @param key the selection key.
77       * @param interestOpsCallback interestOps callback.
78       * @param sessionClosedCallback session closed callback.
79       *
80       * @since 4.1
81       */
82      public IOSessionImpl(
83              final SelectionKey key,
84              final InterestOpsCallback interestOpsCallback,
85              final SessionClosedCallback sessionClosedCallback) {
86          super();
87          Args.notNull(key, "Selection key");
88          this.key = key;
89          this.channel = (ByteChannel) this.key.channel();
90          this.interestOpsCallback = interestOpsCallback;
91          this.sessionClosedCallback = sessionClosedCallback;
92          this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
93          this.currentEventMask = key.interestOps();
94          this.socketTimeout = 0;
95          this.status = ACTIVE;
96          final long now = System.currentTimeMillis();
97          this.startedTime = now;
98          this.lastReadTime = now;
99          this.lastWriteTime = now;
100         this.lastAccessTime = now;
101     }
102 
103     /**
104      * Creates new instance of IOSessionImpl.
105      *
106      * @param key the selection key.
107      * @param sessionClosedCallback session closed callback.
108      */
109     public IOSessionImpl(
110             final SelectionKey key,
111             final SessionClosedCallback sessionClosedCallback) {
112         this(key, null, sessionClosedCallback);
113     }
114 
115     @Override
116     public ByteChannel channel() {
117         return this.channel;
118     }
119 
120     @Override
121     public SocketAddress getLocalAddress() {
122         return this.channel instanceof SocketChannel
123                         ? ((SocketChannel) this.channel).socket().getLocalSocketAddress()
124                         : null;
125     }
126 
127     @Override
128     public SocketAddress getRemoteAddress() {
129         return this.channel instanceof SocketChannel
130                         ? ((SocketChannel) this.channel).socket().getRemoteSocketAddress()
131                         : null;
132     }
133 
134     @Override
135     public int getEventMask() {
136         return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
137     }
138 
139     @Override
140     public synchronized void setEventMask(final int ops) {
141         if (this.status == CLOSED) {
142             return;
143         }
144         if (this.interestOpsCallback != null) {
145             // update the current event mask
146             this.currentEventMask = ops;
147 
148             // local variable
149             final InterestOpEntryor/InterestOpEntry.html#InterestOpEntry">InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
150 
151             // add this operation to the interestOps() queue
152             this.interestOpsCallback.addInterestOps(entry);
153         } else {
154             this.key.interestOps(ops);
155         }
156         this.key.selector().wakeup();
157     }
158 
159     @Override
160     public synchronized void setEvent(final int op) {
161         if (this.status == CLOSED) {
162             return;
163         }
164         if (this.interestOpsCallback != null) {
165             // update the current event mask
166             this.currentEventMask |= op;
167 
168             // local variable
169             final InterestOpEntryor/InterestOpEntry.html#InterestOpEntry">InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
170 
171             // add this operation to the interestOps() queue
172             this.interestOpsCallback.addInterestOps(entry);
173         } else {
174             final int ops = this.key.interestOps();
175             this.key.interestOps(ops | op);
176         }
177         this.key.selector().wakeup();
178     }
179 
180     @Override
181     public synchronized void clearEvent(final int op) {
182         if (this.status == CLOSED) {
183             return;
184         }
185         if (this.interestOpsCallback != null) {
186             // update the current event mask
187             this.currentEventMask &= ~op;
188 
189             // local variable
190             final InterestOpEntryor/InterestOpEntry.html#InterestOpEntry">InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
191 
192             // add this operation to the interestOps() queue
193             this.interestOpsCallback.addInterestOps(entry);
194         } else {
195             final int ops = this.key.interestOps();
196             this.key.interestOps(ops & ~op);
197         }
198         this.key.selector().wakeup();
199     }
200 
201     @Override
202     public int getSocketTimeout() {
203         return this.socketTimeout;
204     }
205 
206     @Override
207     public void setSocketTimeout(final int timeout) {
208         this.socketTimeout = timeout;
209         this.lastAccessTime = System.currentTimeMillis();
210     }
211 
212     @Override
213     public void close() {
214         synchronized (this) {
215             if (this.status == CLOSED) {
216                 return;
217             }
218             this.status = CLOSED;
219             this.key.cancel();
220             try {
221                 this.key.channel().close();
222             } catch (final IOException ex) {
223                 // Munching exceptions is not nice
224                 // but in this case it is justified
225             }
226             if (this.sessionClosedCallback != null) {
227                 this.sessionClosedCallback.sessionClosed(this);
228             }
229             if (this.key.selector().isOpen()) {
230                 this.key.selector().wakeup();
231             }
232         }
233     }
234 
235     @Override
236     public int getStatus() {
237         return this.status;
238     }
239 
240     @Override
241     public boolean isClosed() {
242         return this.status == CLOSED;
243     }
244 
245     @Override
246     public void shutdown() {
247         // For this type of session, a close() does exactly
248         // what we need and nothing more.
249         close();
250     }
251 
252     @Override
253     public boolean hasBufferedInput() {
254         final SessionBufferStatus buffStatus = this.bufferStatus;
255         return buffStatus != null && buffStatus.hasBufferedInput();
256     }
257 
258     @Override
259     public boolean hasBufferedOutput() {
260         final SessionBufferStatus buffStatus = this.bufferStatus;
261         return buffStatus != null && buffStatus.hasBufferedOutput();
262     }
263 
264     @Override
265     public void setBufferStatus(final SessionBufferStatus bufferStatus) {
266         this.bufferStatus = bufferStatus;
267     }
268 
269     @Override
270     public Object getAttribute(final String name) {
271         return this.attributes.get(name);
272     }
273 
274     @Override
275     public Object removeAttribute(final String name) {
276         return this.attributes.remove(name);
277     }
278 
279     @Override
280     public void setAttribute(final String name, final Object obj) {
281         this.attributes.put(name, obj);
282     }
283 
284     public long getStartedTime() {
285         return this.startedTime;
286     }
287 
288     public long getLastReadTime() {
289         return this.lastReadTime;
290     }
291 
292     public long getLastWriteTime() {
293         return this.lastWriteTime;
294     }
295 
296     public long getLastAccessTime() {
297         return this.lastAccessTime;
298     }
299 
300     void resetLastRead() {
301         final long now = System.currentTimeMillis();
302         this.lastReadTime = now;
303         this.lastAccessTime = now;
304     }
305 
306     void resetLastWrite() {
307         final long now = System.currentTimeMillis();
308         this.lastWriteTime = now;
309         this.lastAccessTime = now;
310     }
311 
312     private static void formatOps(final StringBuilder buffer, final int ops) {
313         if ((ops & SelectionKey.OP_READ) > 0) {
314             buffer.append('r');
315         }
316         if ((ops & SelectionKey.OP_WRITE) > 0) {
317             buffer.append('w');
318         }
319         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
320             buffer.append('a');
321         }
322         if ((ops & SelectionKey.OP_CONNECT) > 0) {
323             buffer.append('c');
324         }
325     }
326 
327     private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
328         if (socketAddress instanceof InetSocketAddress) {
329             final InetSocketAddress addr = ((InetSocketAddress) socketAddress);
330             buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
331                 addr.getAddress())
332             .append(':')
333             .append(addr.getPort());
334         } else {
335             buffer.append(socketAddress);
336         }
337     }
338 
339     @Override
340     public String toString() {
341         final StringBuilder buffer = new StringBuilder();
342         synchronized (this.key) {
343             final SocketAddress remoteAddress = getRemoteAddress();
344             final SocketAddress localAddress = getLocalAddress();
345             if (remoteAddress != null && localAddress != null) {
346                 formatAddress(buffer, localAddress);
347                 buffer.append("<->");
348                 formatAddress(buffer, remoteAddress);
349             }
350             buffer.append('[');
351             switch (this.status) {
352             case ACTIVE:
353                 buffer.append("ACTIVE");
354                 break;
355             case CLOSING:
356                 buffer.append("CLOSING");
357                 break;
358             case CLOSED:
359                 buffer.append("CLOSED");
360                 break;
361             }
362             buffer.append("][");
363             if (this.key.isValid()) {
364                 formatOps(buffer, this.interestOpsCallback != null ?
365                         this.currentEventMask : this.key.interestOps());
366                 buffer.append(':');
367                 formatOps(buffer, this.key.readyOps());
368             }
369         }
370         buffer.append(']');
371         return new String(buffer);
372     }
373 
374     @Override
375     public Socket getSocket() {
376         return this.channel instanceof SocketChannel ? ((SocketChannel) this.channel).socket() : null;
377     }
378 
379 }