1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.BindException;
32 import java.net.ServerSocket;
33 import java.net.SocketAddress;
34 import java.nio.channels.CancelledKeyException;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.ServerSocketChannel;
37 import java.nio.channels.SocketChannel;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.ConcurrentMap;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 import org.apache.hc.core5.concurrent.BasicFuture;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Callback;
51 import org.apache.hc.core5.io.Closer;
52
53 class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor {
54
55 private final IOReactorConfig reactorConfig;
56 private final Callback<ChannelEntry> callback;
57 private final Queue<ListenerEndpointRequest> requestQueue;
58 private final ConcurrentMap<ListenerEndpointImpl, Boolean> endpoints;
59 private final AtomicBoolean paused;
60 private final long selectTimeoutMillis;
61
62 SingleCoreListeningIOReactor(
63 final Callback<Exception> exceptionCallback,
64 final IOReactorConfig ioReactorConfig,
65 final Callback<ChannelEntry> callback) {
66 super(exceptionCallback);
67 this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
68 this.callback = callback;
69 this.requestQueue = new ConcurrentLinkedQueue<>();
70 this.endpoints = new ConcurrentHashMap<>();
71 this.paused = new AtomicBoolean(false);
72 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
73 }
74
75 @Override
76 void doTerminate() {
77 ListenerEndpointRequest request;
78 while ((request = this.requestQueue.poll()) != null) {
79 request.cancel();
80 }
81 }
82
83 @Override
84 protected final void doExecute() throws IOException {
85 while (!Thread.currentThread().isInterrupted()) {
86 if (getStatus() != IOReactorStatus.ACTIVE) {
87 break;
88 }
89
90 final int readyCount = this.selector.select(this.selectTimeoutMillis);
91
92 if (getStatus() != IOReactorStatus.ACTIVE) {
93 break;
94 }
95
96 processEvents(readyCount);
97 }
98 }
99
100 private void processEvents(final int readyCount) throws IOException {
101 if (!this.paused.get()) {
102 processSessionRequests();
103 }
104
105 if (readyCount > 0) {
106 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
107 for (final SelectionKey key : selectedKeys) {
108
109 processEvent(key);
110
111 }
112 selectedKeys.clear();
113 }
114 }
115
116 private void processEvent(final SelectionKey key) throws IOException {
117 try {
118
119 if (key.isAcceptable()) {
120
121 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
122 for (;;) {
123 final SocketChannel socketChannel = serverChannel.accept();
124 if (socketChannel == null) {
125 break;
126 }
127 final ListenerEndpointRequest endpointRequest = (ListenerEndpointRequest) key.attachment();
128 this.callback.execute(new ChannelEntry(socketChannel, endpointRequest.attachment));
129 }
130 }
131
132 } catch (final CancelledKeyException ex) {
133 final ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
134 this.endpoints.remove(endpoint);
135 key.attach(null);
136 }
137 }
138
139 @Override
140 public Future<ListenerEndpoint> listen(
141 final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
142 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
143 throw new IOReactorShutdownException("I/O reactor has been shut down");
144 }
145 final BasicFuture<ListenerEndpoint> future = new BasicFuture<>(callback);
146 this.requestQueue.add(new ListenerEndpointRequest(address, attachment, future));
147 this.selector.wakeup();
148 return future;
149 }
150
151 @Override
152 public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
153 return listen(address, null, callback);
154 }
155
156 private void processSessionRequests() throws IOException {
157 ListenerEndpointRequest request;
158 while ((request = this.requestQueue.poll()) != null) {
159 if (request.isCancelled()) {
160 continue;
161 }
162 final SocketAddress address = request.address;
163 final ServerSocketChannel serverChannel = ServerSocketChannel.open();
164 try {
165 final ServerSocket socket = serverChannel.socket();
166 socket.setReuseAddress(this.reactorConfig.isSoReuseAddress());
167 if (this.reactorConfig.getRcvBufSize() > 0) {
168 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
169 }
170 serverChannel.configureBlocking(false);
171
172 try {
173 socket.bind(address, this.reactorConfig.getBacklogSize());
174 } catch (final BindException ex) {
175 final BindException detailedEx = new BindException(
176 String.format("Socket bind failure for socket %s, address=%s, BacklogSize=%d: %s", socket,
177 address, this.reactorConfig.getBacklogSize(), ex));
178 detailedEx.setStackTrace(ex.getStackTrace());
179 throw detailedEx;
180 }
181
182 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
183 key.attach(request);
184 final ListenerEndpointImpl endpoint = new ListenerEndpointImpl(key, request.attachment, socket.getLocalSocketAddress());
185 this.endpoints.put(endpoint, Boolean.TRUE);
186 request.completed(endpoint);
187 } catch (final IOException ex) {
188 Closer.closeQuietly(serverChannel);
189 request.failed(ex);
190 }
191 }
192 }
193
194 @Override
195 public Set<ListenerEndpoint> getEndpoints() {
196 final Set<ListenerEndpoint> set = new HashSet<>();
197 final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
198 while (it.hasNext()) {
199 final ListenerEndpoint endpoint = it.next();
200 if (!endpoint.isClosed()) {
201 set.add(endpoint);
202 } else {
203 it.remove();
204 }
205 }
206 return set;
207 }
208
209 @Override
210 public void pause() throws IOException {
211 if (paused.compareAndSet(false, true)) {
212 final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
213 while (it.hasNext()) {
214 final ListenerEndpointImpl endpoint = it.next();
215 if (!endpoint.isClosed()) {
216 endpoint.close();
217 this.requestQueue.add(new ListenerEndpointRequest(endpoint.address, endpoint.attachment, null));
218 }
219 it.remove();
220 }
221 }
222 }
223
224 @Override
225 public void resume() throws IOException {
226 if (paused.compareAndSet(true, false)) {
227 this.selector.wakeup();
228 }
229 }
230
231 }