1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.util.Set;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
37 import org.apache.hc.core5.concurrent.FutureCallback;
38 import org.apache.hc.core5.function.Callback;
39 import org.apache.hc.core5.function.Decorator;
40 import org.apache.hc.core5.io.CloseMode;
41 import org.apache.hc.core5.util.Args;
42 import org.apache.hc.core5.util.TimeValue;
43
44
45
46
47
48
49
50
51
52
53 public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionAcceptor {
54
55 private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
56 private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
57
58 private final int workerCount;
59 private final SingleCoreIOReactor[] workers;
60 private final SingleCoreListeningIOReactor listener;
61 private final MultiCoreIOReactor ioReactor;
62 private final IOWorkers.Selector workerSelector;
63
64
65
66
67
68
69
70
71
72
73
74 public DefaultListeningIOReactor(
75 final IOEventHandlerFactory eventHandlerFactory,
76 final IOReactorConfig ioReactorConfig,
77 final ThreadFactory dispatchThreadFactory,
78 final ThreadFactory listenerThreadFactory,
79 final Decorator<IOSession> ioSessionDecorator,
80 final Callback<Exception> exceptionCallback,
81 final IOSessionListener sessionListener,
82 final Callback<IOSession> sessionShutdownCallback) {
83 Args.notNull(eventHandlerFactory, "Event handler factory");
84 this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
85 this.workers = new SingleCoreIOReactor[workerCount];
86 final Thread[] threads = new Thread[workerCount + 1];
87 for (int i = 0; i < this.workers.length; i++) {
88 final SingleCoreIOReactoractor.html#SingleCoreIOReactor">SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
89 exceptionCallback,
90 eventHandlerFactory,
91 ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
92 ioSessionDecorator,
93 sessionListener,
94 sessionShutdownCallback);
95 this.workers[i] = dispatcher;
96 threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
97 }
98 final IOReactortor.html#IOReactor">IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
99 System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount);
100 this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, new Callback<ChannelEntry>() {
101
102 @Override
103 public void execute(final ChannelEntry entry) {
104 enqueueChannel(entry);
105 }
106
107 });
108 ioReactors[0] = this.listener;
109 threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener));
110
111 this.ioReactor = new MultiCoreIOReactor(ioReactors, threads);
112
113 workerSelector = IOWorkers.newSelector(workers);
114 }
115
116
117
118
119
120
121
122
123
124
125 public DefaultListeningIOReactor(
126 final IOEventHandlerFactory eventHandlerFactory,
127 final IOReactorConfig config,
128 final Callback<IOSession> sessionShutdownCallback) {
129 this(eventHandlerFactory, config, null, null, null, null, null, sessionShutdownCallback);
130 }
131
132
133
134
135
136
137
138
139 public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
140 this(eventHandlerFactory, null, null);
141 }
142
143 @Override
144 public void start() {
145 ioReactor.start();
146 }
147
148 public Future<ListenerEndpoint> listen(
149 final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
150 return listener.listen(address, attachment, callback);
151 }
152
153 @Override
154 public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
155 return listen(address, null, callback);
156 }
157
158 public Future<ListenerEndpoint> listen(final SocketAddress address) {
159 return listen(address, null);
160 }
161
162 @Override
163 public Set<ListenerEndpoint> getEndpoints() {
164 return listener.getEndpoints();
165 }
166
167 @Override
168 public void pause() throws IOException {
169 listener.pause();
170 }
171
172 @Override
173 public void resume() throws IOException {
174 listener.resume();
175 }
176
177 @Override
178 public IOReactorStatus getStatus() {
179 return ioReactor.getStatus();
180 }
181
182 @Override
183 IOWorkers.Selector getWorkerSelector() {
184 return workerSelector;
185 }
186
187 private void enqueueChannel(final ChannelEntry entry) {
188 try {
189 workerSelector.next().enqueueChannel(entry);
190 } catch (final IOReactorShutdownException ex) {
191 initiateShutdown();
192 }
193 }
194
195
196 @Override
197 public void initiateShutdown() {
198 ioReactor.initiateShutdown();
199 }
200
201 @Override
202 public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
203 ioReactor.awaitShutdown(waitTime);
204 }
205
206 @Override
207 public void close(final CloseMode closeMode) {
208 ioReactor.close(closeMode);
209 }
210
211 @Override
212 public void close() throws IOException {
213 ioReactor.close();
214 }
215
216 }