1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.ServerSocket;
32 import java.net.SocketAddress;
33 import java.nio.channels.CancelledKeyException;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43 import java.util.concurrent.ThreadFactory;
44
45 import org.apache.http.nio.reactor.IOReactorException;
46 import org.apache.http.nio.reactor.IOReactorStatus;
47 import org.apache.http.nio.reactor.ListenerEndpoint;
48 import org.apache.http.nio.reactor.ListeningIOReactor;
49 import org.apache.http.params.HttpParams;
50 import org.apache.http.util.Asserts;
51
52
53
54
55
56
57
58
59 @SuppressWarnings("deprecation")
60 public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
61 implements ListeningIOReactor {
62
63 private final Queue<ListenerEndpointImpl> requestQueue;
64 private final Set<ListenerEndpointImpl> endpoints;
65 private final Set<SocketAddress> pausedEndpoints;
66
67 private volatile boolean paused;
68
69
70
71
72
73
74
75
76
77
78
79 public DefaultListeningIOReactor(
80 final IOReactorConfig config,
81 final ThreadFactory threadFactory) throws IOReactorException {
82 super(config, threadFactory);
83 this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
84 this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
85 this.pausedEndpoints = new HashSet<SocketAddress>();
86 }
87
88
89
90
91
92
93
94
95
96
97 public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
98 this(config, null);
99 }
100
101
102
103
104
105
106
107
108 public DefaultListeningIOReactor() throws IOReactorException {
109 this(null, null);
110 }
111
112
113
114
115 @Deprecated
116 public DefaultListeningIOReactor(
117 final int workerCount,
118 final ThreadFactory threadFactory,
119 final HttpParams params) throws IOReactorException {
120 this(convert(workerCount, params), threadFactory);
121 }
122
123
124
125
126 @Deprecated
127 public DefaultListeningIOReactor(
128 final int workerCount,
129 final HttpParams params) throws IOReactorException {
130 this(convert(workerCount, params), null);
131 }
132
133 @Override
134 protected void cancelRequests() throws IOReactorException {
135 ListenerEndpointImpl request;
136 while ((request = this.requestQueue.poll()) != null) {
137 request.cancel();
138 }
139 }
140
141 @Override
142 protected void processEvents(final int readyCount) throws IOReactorException {
143 if (!this.paused) {
144 processSessionRequests();
145 }
146
147 if (readyCount > 0) {
148 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
149 for (final SelectionKey key : selectedKeys) {
150
151 processEvent(key);
152
153 }
154 selectedKeys.clear();
155 }
156 }
157
158 private void processEvent(final SelectionKey key)
159 throws IOReactorException {
160 try {
161
162 if (key.isAcceptable()) {
163
164 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
165 for (;;) {
166 SocketChannel socketChannel = null;
167 try {
168 socketChannel = serverChannel.accept();
169 } catch (final IOException ex) {
170 if (this.exceptionHandler == null ||
171 !this.exceptionHandler.handle(ex)) {
172 throw new IOReactorException(
173 "Failure accepting connection", ex);
174 }
175 }
176 if (socketChannel == null) {
177 break;
178 }
179 try {
180 prepareSocket(socketChannel.socket());
181 } catch (final IOException ex) {
182 if (this.exceptionHandler == null ||
183 !this.exceptionHandler.handle(ex)) {
184 throw new IOReactorException(
185 "Failure initalizing socket", ex);
186 }
187 }
188 final ChannelEntryactor/ChannelEntry.html#ChannelEntry">ChannelEntry entry = new ChannelEntry(socketChannel);
189 addChannel(entry);
190 }
191 }
192
193 } catch (final CancelledKeyException ex) {
194 final ListenerEndpoint../../org/apache/http/nio/reactor/ListenerEndpoint.html#ListenerEndpoint">ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
195 this.endpoints.remove(endpoint);
196 key.attach(null);
197 }
198 }
199
200 private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
201 return new ListenerEndpointImpl(
202 address,
203 new ListenerEndpointClosedCallback() {
204
205 @Override
206 public void endpointClosed(final ListenerEndpoint endpoint) {
207 endpoints.remove(endpoint);
208 }
209
210 });
211 }
212
213 @Override
214 public ListenerEndpoint listen(final SocketAddress address) {
215 Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
216 "I/O reactor has been shut down");
217 final ListenerEndpointImpl request = createEndpoint(address);
218 this.requestQueue.add(request);
219 this.selector.wakeup();
220 return request;
221 }
222
223 private void processSessionRequests() throws IOReactorException {
224 ListenerEndpointImpl request;
225 while ((request = this.requestQueue.poll()) != null) {
226 final SocketAddress address = request.getAddress();
227 final ServerSocketChannel serverChannel;
228 try {
229 serverChannel = ServerSocketChannel.open();
230 } catch (final IOException ex) {
231 throw new IOReactorException("Failure opening server socket", ex);
232 }
233 try {
234 final ServerSocket socket = serverChannel.socket();
235 socket.setReuseAddress(this.config.isSoReuseAddress());
236 if (this.config.getSoTimeout() > 0) {
237 socket.setSoTimeout(this.config.getSoTimeout());
238 }
239 if (this.config.getRcvBufSize() > 0) {
240 socket.setReceiveBufferSize(this.config.getRcvBufSize());
241 }
242 serverChannel.configureBlocking(false);
243 socket.bind(address, this.config.getBacklogSize());
244 } catch (final IOException ex) {
245 closeChannel(serverChannel);
246 request.failed(ex);
247 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
248 throw new IOReactorException("Failure binding socket to address "
249 + address, ex);
250 }
251 return;
252 }
253 try {
254 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
255 key.attach(request);
256 request.setKey(key);
257 } catch (final IOException ex) {
258 closeChannel(serverChannel);
259 throw new IOReactorException("Failure registering channel " +
260 "with the selector", ex);
261 }
262
263 this.endpoints.add(request);
264 request.completed(serverChannel.socket().getLocalSocketAddress());
265 }
266 }
267
268 @Override
269 public Set<ListenerEndpoint> getEndpoints() {
270 final Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
271 synchronized (this.endpoints) {
272 final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
273 while (it.hasNext()) {
274 final ListenerEndpoint endpoint = it.next();
275 if (!endpoint.isClosed()) {
276 set.add(endpoint);
277 } else {
278 it.remove();
279 }
280 }
281 }
282 return set;
283 }
284
285 @Override
286 public void pause() throws IOException {
287 if (this.paused) {
288 return;
289 }
290 this.paused = true;
291 synchronized (this.endpoints) {
292 for (final ListenerEndpointImpl endpoint : this.endpoints) {
293 if (!endpoint.isClosed()) {
294 endpoint.close();
295 this.pausedEndpoints.add(endpoint.getAddress());
296 }
297 }
298 this.endpoints.clear();
299 }
300 }
301
302 @Override
303 public void resume() throws IOException {
304 if (!this.paused) {
305 return;
306 }
307 this.paused = false;
308 for (final SocketAddress address: this.pausedEndpoints) {
309 final ListenerEndpointImpl request = createEndpoint(address);
310 this.requestQueue.add(request);
311 }
312 this.pausedEndpoints.clear();
313 this.selector.wakeup();
314 }
315
316 }