View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.SocketChannel;
37  import java.util.Deque;
38  import java.util.concurrent.ConcurrentLinkedDeque;
39  import java.util.concurrent.atomic.AtomicLong;
40  import java.util.concurrent.atomic.AtomicReference;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.io.Closer;
46  import org.apache.hc.core5.util.Args;
47  import org.apache.hc.core5.util.Timeout;
48  
49  class IOSessionImpl implements IOSession {
50  
51      /** Counts instances created. */
52      private final static AtomicLong COUNT = new AtomicLong(0);
53  
54      private final SelectionKey key;
55      private final SocketChannel channel;
56      private final Deque<Command> commandQueue;
57      private final Lock lock;
58      private final String id;
59      private final AtomicReference<IOEventHandler> handlerRef;
60      private final AtomicReference<IOSession.Status> status;
61  
62      private volatile Timeout socketTimeout;
63      private volatile long lastReadTime;
64      private volatile long lastWriteTime;
65      private volatile long lastEventTime;
66  
67      public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
68          super();
69          this.key = Args.notNull(key, "Selection key");
70          this.channel = Args.notNull(socketChannel, "Socket channel");
71          this.commandQueue = new ConcurrentLinkedDeque<>();
72          this.lock = new ReentrantLock();
73          this.socketTimeout = Timeout.DISABLED;
74          this.id = String.format(type + "-%010d", COUNT.getAndIncrement());
75          this.handlerRef = new AtomicReference<>();
76          this.status = new AtomicReference<>(Status.ACTIVE);
77          final long currentTimeMillis = System.currentTimeMillis();
78          this.lastReadTime = currentTimeMillis;
79          this.lastWriteTime = currentTimeMillis;
80          this.lastEventTime = currentTimeMillis;
81      }
82  
83      @Override
84      public String getId() {
85          return id;
86      }
87  
88      @Override
89      public IOEventHandler getHandler() {
90          return handlerRef.get();
91      }
92  
93      @Override
94      public void upgrade(final IOEventHandler handler) {
95          handlerRef.set(handler);
96      }
97  
98      @Override
99      public Lock getLock() {
100         return lock;
101     }
102 
103     @Override
104     public void enqueue(final Command command, final Command.Priority priority) {
105         if (priority == Command.Priority.IMMEDIATE) {
106             commandQueue.addFirst(command);
107         } else {
108             commandQueue.add(command);
109         }
110         setEvent(SelectionKey.OP_WRITE);
111     }
112 
113     @Override
114     public boolean hasCommands() {
115         return !commandQueue.isEmpty();
116     }
117 
118     @Override
119     public Command poll() {
120         return commandQueue.poll();
121     }
122 
123     @Override
124     public ByteChannel channel() {
125         return this.channel;
126     }
127 
128     @Override
129     public SocketAddress getLocalAddress() {
130         return this.channel.socket().getLocalSocketAddress();
131     }
132 
133     @Override
134     public SocketAddress getRemoteAddress() {
135         return this.channel.socket().getRemoteSocketAddress();
136     }
137 
138     @Override
139     public int getEventMask() {
140         return this.key.interestOps();
141     }
142 
143     @Override
144     public void setEventMask(final int newValue) {
145         lock.lock();
146         try {
147             if (isStatusClosed()) {
148                 return;
149             }
150             this.key.interestOps(newValue);
151         } finally {
152             lock.unlock();
153         }
154         this.key.selector().wakeup();
155     }
156 
157     @Override
158     public void setEvent(final int op) {
159         lock.lock();
160         try {
161             if (isStatusClosed()) {
162                 return;
163             }
164             this.key.interestOps(this.key.interestOps() | op);
165         } finally {
166             lock.unlock();
167         }
168         this.key.selector().wakeup();
169     }
170 
171     @Override
172     public void clearEvent(final int op) {
173         lock.lock();
174         try {
175             if (isStatusClosed()) {
176                 return;
177             }
178             this.key.interestOps(this.key.interestOps() & ~op);
179         } finally {
180             lock.unlock();
181         }
182         this.key.selector().wakeup();
183     }
184 
185     @Override
186     public Timeout getSocketTimeout() {
187         return this.socketTimeout;
188     }
189 
190     @Override
191     public void setSocketTimeout(final Timeout timeout) {
192         this.socketTimeout = Timeout.defaultsToDisabled(timeout);
193         this.lastEventTime = System.currentTimeMillis();
194     }
195 
196     @Override
197     public int read(final ByteBuffer dst) throws IOException {
198         return this.channel.read(dst);
199     }
200 
201     @Override
202     public int write(final ByteBuffer src) throws IOException {
203         return this.channel.write(src);
204     }
205 
206     @Override
207     public void updateReadTime() {
208         lastReadTime = System.currentTimeMillis();
209         lastEventTime = lastReadTime;
210     }
211 
212     @Override
213     public void updateWriteTime() {
214         lastWriteTime = System.currentTimeMillis();
215         lastEventTime = lastWriteTime;
216     }
217 
218     @Override
219     public long getLastReadTime() {
220         return lastReadTime;
221     }
222 
223     @Override
224     public long getLastWriteTime() {
225         return lastWriteTime;
226     }
227 
228     @Override
229     public long getLastEventTime() {
230         return lastEventTime;
231     }
232 
233     @Override
234     public Status getStatus() {
235         return this.status.get();
236     }
237 
238     private boolean isStatusClosed() {
239         return this.status.get() == Status.CLOSED;
240     }
241 
242     @Override
243     public boolean isOpen() {
244         return this.status.get() == Status.ACTIVE && this.channel.isOpen();
245     }
246 
247     @Override
248     public void close() {
249         close(CloseMode.GRACEFUL);
250     }
251 
252     @Override
253     public void close(final CloseMode closeMode) {
254         if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
255             if (closeMode == CloseMode.IMMEDIATE) {
256                 try {
257                     this.channel.socket().setSoLinger(true, 0);
258                 } catch (final SocketException e) {
259                     // Quietly ignore
260                 }
261             }
262             this.key.cancel();
263             this.key.attach(null);
264             Closer.closeQuietly(this.key.channel());
265             if (this.key.selector().isOpen()) {
266                 this.key.selector().wakeup();
267             }
268         }
269     }
270 
271     private static void formatOps(final StringBuilder buffer, final int ops) {
272         if ((ops & SelectionKey.OP_READ) > 0) {
273             buffer.append('r');
274         }
275         if ((ops & SelectionKey.OP_WRITE) > 0) {
276             buffer.append('w');
277         }
278         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
279             buffer.append('a');
280         }
281         if ((ops & SelectionKey.OP_CONNECT) > 0) {
282             buffer.append('c');
283         }
284     }
285 
286     @Override
287     public String toString() {
288         final StringBuilder buffer = new StringBuilder();
289         buffer.append(id).append("[");
290         buffer.append(this.status);
291         buffer.append("][");
292         if (this.key.isValid()) {
293             formatOps(buffer, this.key.interestOps());
294             buffer.append(":");
295             formatOps(buffer, this.key.readyOps());
296         }
297         buffer.append("]");
298         return buffer.toString();
299     }
300 
301 }