View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.SocketChannel;
37  import java.util.Deque;
38  import java.util.concurrent.ConcurrentLinkedDeque;
39  import java.util.concurrent.atomic.AtomicLong;
40  import java.util.concurrent.atomic.AtomicReference;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.io.Closer;
46  import org.apache.hc.core5.util.Args;
47  import org.apache.hc.core5.util.Timeout;
48  
49  class IOSessionImpl implements IOSession {
50  
51      /** Counts instances created. */
52      private final static AtomicLong COUNT = new AtomicLong(0);
53  
54      private final SelectionKey key;
55      private final SocketChannel channel;
56      private final Deque<Command> commandQueue;
57      private final Lock lock;
58      private final String id;
59      private final AtomicReference<IOEventHandler> handlerRef;
60      private final AtomicReference<IOSession.Status> status;
61  
62      private volatile Timeout socketTimeout;
63      private volatile long lastReadTime;
64      private volatile long lastWriteTime;
65      private volatile long lastEventTime;
66  
67      public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
68          super();
69          this.key = Args.notNull(key, "Selection key");
70          this.channel = Args.notNull(socketChannel, "Socket channel");
71          this.commandQueue = new ConcurrentLinkedDeque<>();
72          this.lock = new ReentrantLock();
73          this.socketTimeout = Timeout.DISABLED;
74          this.id = String.format(type + "-%08X", COUNT.getAndIncrement());
75          this.handlerRef = new AtomicReference<>();
76          this.status = new AtomicReference<>(Status.ACTIVE);
77          final long currentTimeMillis = System.currentTimeMillis();
78          this.lastReadTime = currentTimeMillis;
79          this.lastWriteTime = currentTimeMillis;
80          this.lastEventTime = currentTimeMillis;
81      }
82  
83      @Override
84      public String getId() {
85          return id;
86      }
87  
88      @Override
89      public IOEventHandler getHandler() {
90          return handlerRef.get();
91      }
92  
93      @Override
94      public void upgrade(final IOEventHandler handler) {
95          handlerRef.set(handler);
96      }
97  
98      @Override
99      public Lock getLock() {
100         return lock;
101     }
102 
103     @Override
104     public void enqueue(final Command command, final Command.Priority priority) {
105         if (priority == Command.Priority.IMMEDIATE) {
106             commandQueue.addFirst(command);
107         } else {
108             commandQueue.add(command);
109         }
110         setEvent(SelectionKey.OP_WRITE);
111     }
112 
113     @Override
114     public boolean hasCommands() {
115         return !commandQueue.isEmpty();
116     }
117 
118     @Override
119     public Command poll() {
120         return commandQueue.poll();
121     }
122 
123     @Override
124     public ByteChannel channel() {
125         return this.channel;
126     }
127 
128     @Override
129     public SocketAddress getLocalAddress() {
130         return this.channel.socket().getLocalSocketAddress();
131     }
132 
133     @Override
134     public SocketAddress getRemoteAddress() {
135         return this.channel.socket().getRemoteSocketAddress();
136     }
137 
138     @Override
139     public int getEventMask() {
140         return this.key.interestOps();
141     }
142 
143     @Override
144     public void setEventMask(final int newValue) {
145         if (isStatusClosed()) {
146             return;
147         }
148         this.key.interestOps(newValue);
149         this.key.selector().wakeup();
150     }
151 
152     @Override
153     public void setEvent(final int op) {
154         if (isStatusClosed()) {
155             return;
156         }
157         lock.lock();
158         try {
159             this.key.interestOps(this.key.interestOps() | op);
160         } finally {
161             lock.unlock();
162         }
163         this.key.selector().wakeup();
164     }
165 
166     @Override
167     public void clearEvent(final int op) {
168         if (isStatusClosed()) {
169             return;
170         }
171         lock.lock();
172         try {
173             this.key.interestOps(this.key.interestOps() & ~op);
174         } finally {
175             lock.unlock();
176         }
177         this.key.selector().wakeup();
178     }
179 
180     @Override
181     public Timeout getSocketTimeout() {
182         return this.socketTimeout;
183     }
184 
185     @Override
186     public void setSocketTimeout(final Timeout timeout) {
187         this.socketTimeout = Timeout.defaultsToDisabled(timeout);
188         this.lastEventTime = System.currentTimeMillis();
189     }
190 
191     @Override
192     public int read(final ByteBuffer dst) throws IOException {
193         return this.channel.read(dst);
194     }
195 
196     @Override
197     public int write(final ByteBuffer src) throws IOException {
198         return this.channel.write(src);
199     }
200 
201     @Override
202     public void updateReadTime() {
203         lastReadTime = System.currentTimeMillis();
204         lastEventTime = lastReadTime;
205     }
206 
207     @Override
208     public void updateWriteTime() {
209         lastWriteTime = System.currentTimeMillis();
210         lastEventTime = lastWriteTime;
211     }
212 
213     @Override
214     public long getLastReadTime() {
215         return lastReadTime;
216     }
217 
218     @Override
219     public long getLastWriteTime() {
220         return lastWriteTime;
221     }
222 
223     @Override
224     public long getLastEventTime() {
225         return lastEventTime;
226     }
227 
228     @Override
229     public Status getStatus() {
230         return this.status.get();
231     }
232 
233     private boolean isStatusClosed() {
234         return this.status.get() == Status.CLOSED;
235     }
236 
237     @Override
238     public boolean isOpen() {
239         return this.status.get() == Status.ACTIVE && this.channel.isOpen();
240     }
241 
242     @Override
243     public void close() {
244         close(CloseMode.GRACEFUL);
245     }
246 
247     @Override
248     public void close(final CloseMode closeMode) {
249         if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
250             if (closeMode == CloseMode.IMMEDIATE) {
251                 try {
252                     this.channel.socket().setSoLinger(true, 0);
253                 } catch (final SocketException e) {
254                     // Quietly ignore
255                 }
256             }
257             this.key.cancel();
258             this.key.attach(null);
259             Closer.closeQuietly(this.key.channel());
260             if (this.key.selector().isOpen()) {
261                 this.key.selector().wakeup();
262             }
263         }
264     }
265 
266     private static void formatOps(final StringBuilder buffer, final int ops) {
267         if ((ops & SelectionKey.OP_READ) > 0) {
268             buffer.append('r');
269         }
270         if ((ops & SelectionKey.OP_WRITE) > 0) {
271             buffer.append('w');
272         }
273         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
274             buffer.append('a');
275         }
276         if ((ops & SelectionKey.OP_CONNECT) > 0) {
277             buffer.append('c');
278         }
279     }
280 
281     @Override
282     public String toString() {
283         final StringBuilder buffer = new StringBuilder();
284         buffer.append(id).append("[");
285         buffer.append(this.status);
286         buffer.append("][");
287         if (this.key.isValid()) {
288             formatOps(buffer, this.key.interestOps());
289             buffer.append(":");
290             formatOps(buffer, this.key.readyOps());
291         }
292         buffer.append("]");
293         return buffer.toString();
294     }
295 
296 }