1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.BindException;
32 import java.net.ServerSocket;
33 import java.net.SocketAddress;
34 import java.nio.channels.CancelledKeyException;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.ServerSocketChannel;
37 import java.nio.channels.SocketChannel;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.ConcurrentMap;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 import org.apache.hc.core5.concurrent.BasicFuture;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Callback;
51 import org.apache.hc.core5.io.Closer;
52
53 class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor {
54
55 private final IOReactorConfig reactorConfig;
56 private final Callback<ChannelEntry> callback;
57 private final Queue<ListenerEndpointRequest> requestQueue;
58 private final ConcurrentMap<ListenerEndpointImpl, Boolean> endpoints;
59 private final AtomicBoolean paused;
60 private final long selectTimeoutMillis;
61
62 SingleCoreListeningIOReactor(
63 final Callback<Exception> exceptionCallback,
64 final IOReactorConfig ioReactorConfig,
65 final Callback<ChannelEntry> callback) {
66 super(exceptionCallback);
67 this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
68 this.callback = callback;
69 this.requestQueue = new ConcurrentLinkedQueue<>();
70 this.endpoints = new ConcurrentHashMap<>();
71 this.paused = new AtomicBoolean(false);
72 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
73 }
74
75 @Override
76 void doTerminate() {
77 ListenerEndpointRequest request;
78 while ((request = this.requestQueue.poll()) != null) {
79 request.cancel();
80 }
81 }
82
83 @Override
84 protected final void doExecute() throws IOException {
85 while (!Thread.currentThread().isInterrupted()) {
86 if (getStatus() != IOReactorStatus.ACTIVE) {
87 break;
88 }
89
90 final int readyCount = this.selector.select(this.selectTimeoutMillis);
91
92 if (getStatus() != IOReactorStatus.ACTIVE) {
93 break;
94 }
95
96 processEvents(readyCount);
97 }
98 }
99
100 private void processEvents(final int readyCount) throws IOException {
101 if (!this.paused.get()) {
102 processSessionRequests();
103 }
104
105 if (readyCount > 0) {
106 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
107 for (final SelectionKey key : selectedKeys) {
108
109 processEvent(key);
110
111 }
112 selectedKeys.clear();
113 }
114 }
115
116 private void processEvent(final SelectionKey key) throws IOException {
117 try {
118
119 if (key.isAcceptable()) {
120
121 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
122 for (;;) {
123 final SocketChannel socketChannel = serverChannel.accept();
124 if (socketChannel == null) {
125 break;
126 }
127 final ListenerEndpointRequesthc/core5/reactor/ListenerEndpointRequest.html#ListenerEndpointRequest">ListenerEndpointRequest endpointRequest = (ListenerEndpointRequest) key.attachment();
128 this.callback.execute(new ChannelEntry(socketChannel, endpointRequest.attachment));
129 }
130 }
131
132 } catch (final CancelledKeyException ex) {
133 final ListenerEndpointImplrg/apache/hc/core5/reactor/ListenerEndpointImpl.html#ListenerEndpointImpl">ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
134 this.endpoints.remove(endpoint);
135 key.attach(null);
136 }
137 }
138
139 public Future<ListenerEndpoint> listen(
140 final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
141 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
142 throw new IOReactorShutdownException("I/O reactor has been shut down");
143 }
144 final BasicFuture<ListenerEndpoint> future = new BasicFuture<>(callback);
145 this.requestQueue.add(new ListenerEndpointRequest(address, attachment, future));
146 this.selector.wakeup();
147 return future;
148 }
149
150 @Override
151 public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
152 return listen(address, null, callback);
153 }
154
155 private void processSessionRequests() throws IOException {
156 ListenerEndpointRequest request;
157 while ((request = this.requestQueue.poll()) != null) {
158 if (request.isCancelled()) {
159 continue;
160 }
161 final SocketAddress address = request.address;
162 final ServerSocketChannel serverChannel = ServerSocketChannel.open();
163 try {
164 final ServerSocket socket = serverChannel.socket();
165 socket.setReuseAddress(this.reactorConfig.isSoReuseAddress());
166 if (this.reactorConfig.getRcvBufSize() > 0) {
167 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
168 }
169 serverChannel.configureBlocking(false);
170
171 try {
172 socket.bind(address, this.reactorConfig.getBacklogSize());
173 } catch (final BindException ex) {
174 final BindException detailedEx = new BindException(
175 String.format("Socket bind failure for socket %s, address=%s, BacklogSize=%d: %s", socket,
176 address, this.reactorConfig.getBacklogSize(), ex));
177 detailedEx.setStackTrace(ex.getStackTrace());
178 throw detailedEx;
179 }
180
181 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
182 key.attach(request);
183 final ListenerEndpointImplintImpl.html#ListenerEndpointImpl">ListenerEndpointImpl endpoint = new ListenerEndpointImpl(key, request.attachment, socket.getLocalSocketAddress());
184 this.endpoints.put(endpoint, Boolean.TRUE);
185 request.completed(endpoint);
186 } catch (final IOException ex) {
187 Closer.closeQuietly(serverChannel);
188 request.failed(ex);
189 }
190 }
191 }
192
193 @Override
194 public Set<ListenerEndpoint> getEndpoints() {
195 final Set<ListenerEndpoint> set = new HashSet<>();
196 final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
197 while (it.hasNext()) {
198 final ListenerEndpoint endpoint = it.next();
199 if (!endpoint.isClosed()) {
200 set.add(endpoint);
201 } else {
202 it.remove();
203 }
204 }
205 return set;
206 }
207
208 @Override
209 public void pause() throws IOException {
210 if (paused.compareAndSet(false, true)) {
211 final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
212 while (it.hasNext()) {
213 final ListenerEndpointImpl endpoint = it.next();
214 if (!endpoint.isClosed()) {
215 endpoint.close();
216 this.requestQueue.add(new ListenerEndpointRequest(endpoint.address, endpoint.attachment, null));
217 }
218 it.remove();
219 }
220 }
221 }
222
223 @Override
224 public void resume() throws IOException {
225 if (paused.compareAndSet(true, false)) {
226 this.selector.wakeup();
227 }
228 }
229
230 }