1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.BindException;
32 import java.net.ServerSocket;
33 import java.net.SocketAddress;
34 import java.nio.channels.CancelledKeyException;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.ServerSocketChannel;
37 import java.nio.channels.SocketChannel;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.ConcurrentMap;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 import org.apache.hc.core5.concurrent.BasicFuture;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Callback;
51 import org.apache.hc.core5.io.Closer;
52
53 class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor {
54
55 private final IOReactorConfig reactorConfig;
56 private final Callback<SocketChannel> callback;
57 private final Queue<ListenerEndpointRequest> requestQueue;
58 private final ConcurrentMap<ListenerEndpoint, Boolean> endpoints;
59 private final AtomicBoolean paused;
60 private final long selectTimeoutMillis;
61
62 SingleCoreListeningIOReactor(
63 final Callback<Exception> exceptionCallback,
64 final IOReactorConfig ioReactorConfig,
65 final Callback<SocketChannel> callback) {
66 super(exceptionCallback);
67 this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
68 this.callback = callback;
69 this.requestQueue = new ConcurrentLinkedQueue<>();
70 this.endpoints = new ConcurrentHashMap<>();
71 this.paused = new AtomicBoolean(false);
72 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
73 }
74
75 @Override
76 void doTerminate() {
77 ListenerEndpointRequest request;
78 while ((request = this.requestQueue.poll()) != null) {
79 request.cancel();
80 }
81 }
82
83 @Override
84 protected final void doExecute() throws IOException {
85 while (!Thread.currentThread().isInterrupted()) {
86 if (getStatus() != IOReactorStatus.ACTIVE) {
87 break;
88 }
89
90 final int readyCount = this.selector.select(this.selectTimeoutMillis);
91
92 if (getStatus() != IOReactorStatus.ACTIVE) {
93 break;
94 }
95
96 processEvents(readyCount);
97 }
98 }
99
100 private void processEvents(final int readyCount) throws IOException {
101 if (!this.paused.get()) {
102 processSessionRequests();
103 }
104
105 if (readyCount > 0) {
106 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
107 for (final SelectionKey key : selectedKeys) {
108
109 processEvent(key);
110
111 }
112 selectedKeys.clear();
113 }
114 }
115
116 private void processEvent(final SelectionKey key) throws IOException {
117 try {
118
119 if (key.isAcceptable()) {
120
121 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
122 for (;;) {
123 final SocketChannel socketChannel = serverChannel.accept();
124 if (socketChannel == null) {
125 break;
126 }
127 this.callback.execute(socketChannel);
128 }
129 }
130
131 } catch (final CancelledKeyException ex) {
132 final ListenerEndpoint../org/apache/hc/core5/reactor/ListenerEndpoint.html#ListenerEndpoint">ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
133 this.endpoints.remove(endpoint);
134 key.attach(null);
135 }
136 }
137
138 @Override
139 public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
140 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
141 throw new IOReactorShutdownException("I/O reactor has been shut down");
142 }
143 final BasicFuture<ListenerEndpoint> future = new BasicFuture<>(callback);
144 this.requestQueue.add(new ListenerEndpointRequest(address, future));
145 this.selector.wakeup();
146 return future;
147 }
148
149 private void processSessionRequests() throws IOException {
150 ListenerEndpointRequest request;
151 while ((request = this.requestQueue.poll()) != null) {
152 if (request.isCancelled()) {
153 continue;
154 }
155 final SocketAddress address = request.address;
156 final ServerSocketChannel serverChannel = ServerSocketChannel.open();
157 try {
158 final ServerSocket socket = serverChannel.socket();
159 socket.setReuseAddress(this.reactorConfig.isSoReuseAddress());
160 if (this.reactorConfig.getRcvBufSize() > 0) {
161 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
162 }
163 serverChannel.configureBlocking(false);
164
165 try {
166 socket.bind(address, this.reactorConfig.getBacklogSize());
167 } catch (final BindException ex) {
168 final BindException detailedEx = new BindException(
169 String.format("Socket bind failure for socket %s, address=%s, BacklogSize=%d: %s", socket,
170 address, this.reactorConfig.getBacklogSize(), ex));
171 detailedEx.setStackTrace(ex.getStackTrace());
172 throw detailedEx;
173 }
174
175 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
176 key.attach(request);
177 final ListenerEndpoint endpoint = new ListenerEndpointImpl(key, socket.getLocalSocketAddress());
178 this.endpoints.put(endpoint, Boolean.TRUE);
179 request.completed(endpoint);
180 } catch (final IOException ex) {
181 Closer.closeQuietly(serverChannel);
182 request.failed(ex);
183 }
184 }
185 }
186
187 @Override
188 public Set<ListenerEndpoint> getEndpoints() {
189 final Set<ListenerEndpoint> set = new HashSet<>();
190 final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
191 while (it.hasNext()) {
192 final ListenerEndpoint endpoint = it.next();
193 if (!endpoint.isClosed()) {
194 set.add(endpoint);
195 } else {
196 it.remove();
197 }
198 }
199 return set;
200 }
201
202 @Override
203 public void pause() throws IOException {
204 if (paused.compareAndSet(false, true)) {
205 final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
206 while (it.hasNext()) {
207 final ListenerEndpoint endpoint = it.next();
208 if (!endpoint.isClosed()) {
209 endpoint.close();
210 this.requestQueue.add(new ListenerEndpointRequest(endpoint.getAddress(), null));
211 }
212 it.remove();
213 }
214 }
215 }
216
217 @Override
218 public void resume() throws IOException {
219 if (paused.compareAndSet(true, false)) {
220 this.selector.wakeup();
221 }
222 }
223
224 }