1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.util.Set;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
37 import org.apache.hc.core5.concurrent.FutureCallback;
38 import org.apache.hc.core5.function.Callback;
39 import org.apache.hc.core5.function.Decorator;
40 import org.apache.hc.core5.io.CloseMode;
41 import org.apache.hc.core5.util.Args;
42 import org.apache.hc.core5.util.TimeValue;
43
44
45
46
47
48
49
50
51
52
53 public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionAcceptor {
54
55 private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
56 private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
57
58 private final int workerCount;
59 private final SingleCoreIOReactor[] workers;
60 private final SingleCoreListeningIOReactor listener;
61 private final MultiCoreIOReactor ioReactor;
62 private final IOWorkers.Selector workerSelector;
63
64
65
66
67
68
69
70
71
72
73
74 public DefaultListeningIOReactor(
75 final IOEventHandlerFactory eventHandlerFactory,
76 final IOReactorConfig ioReactorConfig,
77 final ThreadFactory dispatchThreadFactory,
78 final ThreadFactory listenerThreadFactory,
79 final Decorator<IOSession> ioSessionDecorator,
80 final Callback<Exception> exceptionCallback,
81 final IOSessionListener sessionListener,
82 final Callback<IOSession> sessionShutdownCallback) {
83 Args.notNull(eventHandlerFactory, "Event handler factory");
84 this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
85 this.workers = new SingleCoreIOReactor[workerCount];
86 final Thread[] threads = new Thread[workerCount + 1];
87 for (int i = 0; i < this.workers.length; i++) {
88 final SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
89 exceptionCallback,
90 eventHandlerFactory,
91 ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
92 ioSessionDecorator,
93 sessionListener,
94 sessionShutdownCallback);
95 this.workers[i] = dispatcher;
96 threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
97 }
98 final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
99 System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount);
100 this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, this::enqueueChannel);
101 ioReactors[0] = this.listener;
102 threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener));
103
104 this.ioReactor = new MultiCoreIOReactor(ioReactors, threads);
105
106 workerSelector = IOWorkers.newSelector(workers);
107 }
108
109
110
111
112
113
114
115
116
117
118 public DefaultListeningIOReactor(
119 final IOEventHandlerFactory eventHandlerFactory,
120 final IOReactorConfig config,
121 final Callback<IOSession> sessionShutdownCallback) {
122 this(eventHandlerFactory, config, null, null, null, null, null, sessionShutdownCallback);
123 }
124
125
126
127
128
129
130
131
132 public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
133 this(eventHandlerFactory, null, null);
134 }
135
136 @Override
137 public void start() {
138 ioReactor.start();
139 }
140
141 @Override
142 public Future<ListenerEndpoint> listen(
143 final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
144 return listener.listen(address, attachment, callback);
145 }
146
147 @Override
148 public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
149 return listen(address, null, callback);
150 }
151
152 public Future<ListenerEndpoint> listen(final SocketAddress address) {
153 return listen(address, null);
154 }
155
156 @Override
157 public Set<ListenerEndpoint> getEndpoints() {
158 return listener.getEndpoints();
159 }
160
161 @Override
162 public void pause() throws IOException {
163 listener.pause();
164 }
165
166 @Override
167 public void resume() throws IOException {
168 listener.resume();
169 }
170
171 @Override
172 public IOReactorStatus getStatus() {
173 return ioReactor.getStatus();
174 }
175
176 @Override
177 IOWorkers.Selector getWorkerSelector() {
178 return workerSelector;
179 }
180
181 private void enqueueChannel(final ChannelEntry entry) {
182 try {
183 workerSelector.next().enqueueChannel(entry);
184 } catch (final IOReactorShutdownException ex) {
185 initiateShutdown();
186 }
187 }
188
189
190 @Override
191 public void initiateShutdown() {
192 ioReactor.initiateShutdown();
193 }
194
195 @Override
196 public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
197 ioReactor.awaitShutdown(waitTime);
198 }
199
200 @Override
201 public void close(final CloseMode closeMode) {
202 ioReactor.close(closeMode);
203 }
204
205 @Override
206 public void close() throws IOException {
207 ioReactor.close();
208 }
209
210 }