1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.channels.SocketChannel;
33 import java.util.Set;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
38 import org.apache.hc.core5.concurrent.FutureCallback;
39 import org.apache.hc.core5.function.Callback;
40 import org.apache.hc.core5.function.Decorator;
41 import org.apache.hc.core5.io.CloseMode;
42 import org.apache.hc.core5.util.Args;
43 import org.apache.hc.core5.util.TimeValue;
44
45
46
47
48
49
50
51
52
53
54 public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionAcceptor {
55
56 private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
57 private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
58
59 private final int workerCount;
60 private final SingleCoreIOReactor[] workers;
61 private final SingleCoreListeningIOReactor listener;
62 private final MultiCoreIOReactor ioReactor;
63 private final IOWorkers.Selector workerSelector;
64
65
66
67
68
69
70
71
72
73
74
75 public DefaultListeningIOReactor(
76 final IOEventHandlerFactory eventHandlerFactory,
77 final IOReactorConfig ioReactorConfig,
78 final ThreadFactory dispatchThreadFactory,
79 final ThreadFactory listenerThreadFactory,
80 final Decorator<IOSession> ioSessionDecorator,
81 final Callback<Exception> exceptionCallback,
82 final IOSessionListener sessionListener,
83 final Callback<IOSession> sessionShutdownCallback) {
84 Args.notNull(eventHandlerFactory, "Event handler factory");
85 this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
86 this.workers = new SingleCoreIOReactor[workerCount];
87 final Thread[] threads = new Thread[workerCount + 1];
88 for (int i = 0; i < this.workers.length; i++) {
89 final SingleCoreIOReactoractor.html#SingleCoreIOReactor">SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
90 exceptionCallback,
91 eventHandlerFactory,
92 ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
93 ioSessionDecorator,
94 sessionListener,
95 sessionShutdownCallback);
96 this.workers[i] = dispatcher;
97 threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
98 }
99 final IOReactortor.html#IOReactor">IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
100 System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount);
101 this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, new Callback<SocketChannel>() {
102
103 @Override
104 public void execute(final SocketChannel channel) {
105 enqueueChannel(channel);
106 }
107
108 });
109 ioReactors[0] = this.listener;
110 threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener));
111
112 this.ioReactor = new MultiCoreIOReactor(ioReactors, threads);
113
114 workerSelector = IOWorkers.newSelector(workers);
115 }
116
117
118
119
120
121
122
123
124
125
126 public DefaultListeningIOReactor(
127 final IOEventHandlerFactory eventHandlerFactory,
128 final IOReactorConfig config,
129 final Callback<IOSession> sessionShutdownCallback) {
130 this(eventHandlerFactory, config, null, null, null, null, null, sessionShutdownCallback);
131 }
132
133
134
135
136
137
138
139
140 public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
141 this(eventHandlerFactory, null, null);
142 }
143
144 @Override
145 public void start() {
146 ioReactor.start();
147 }
148
149 @Override
150 public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
151 return listener.listen(address, callback);
152 }
153
154 public Future<ListenerEndpoint> listen(final SocketAddress address) {
155 return listen(address, null);
156 }
157
158 @Override
159 public Set<ListenerEndpoint> getEndpoints() {
160 return listener.getEndpoints();
161 }
162
163 @Override
164 public void pause() throws IOException {
165 listener.pause();
166 }
167
168 @Override
169 public void resume() throws IOException {
170 listener.resume();
171 }
172
173 @Override
174 public IOReactorStatus getStatus() {
175 return ioReactor.getStatus();
176 }
177
178 @Override
179 IOWorkers.Selector getWorkerSelector() {
180 return workerSelector;
181 }
182
183 private void enqueueChannel(final SocketChannel socketChannel) {
184 try {
185 workerSelector.next().enqueueChannel(socketChannel);
186 } catch (final IOReactorShutdownException ex) {
187 initiateShutdown();
188 }
189 }
190
191
192 @Override
193 public void initiateShutdown() {
194 ioReactor.initiateShutdown();
195 }
196
197 @Override
198 public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
199 ioReactor.awaitShutdown(waitTime);
200 }
201
202 @Override
203 public void close(final CloseMode closeMode) {
204 ioReactor.close(closeMode);
205 }
206
207 @Override
208 public void close() throws IOException {
209 ioReactor.close();
210 }
211
212 }