1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.BindException;
32 import java.net.InetSocketAddress;
33 import java.util.Set;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.http.config.ConnectionConfig;
38 import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
39 import org.apache.http.nio.protocol.HttpAsyncService;
40 import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper;
41 import org.apache.http.nio.reactor.IOEventDispatch;
42 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
43 import org.apache.http.nio.reactor.IOReactorStatus;
44 import org.apache.http.nio.reactor.ListenerEndpoint;
45 import org.apache.http.nio.reactor.ListeningIOReactor;
46 import org.apache.http.protocol.HttpProcessor;
47 import org.apache.http.protocol.ImmutableHttpProcessor;
48 import org.apache.http.protocol.ResponseConnControl;
49 import org.apache.http.protocol.ResponseContent;
50 import org.apache.http.protocol.ResponseDate;
51 import org.apache.http.protocol.ResponseServer;
52 import org.junit.Assert;
53 import org.junit.Test;
54
55
56
57
58 public class TestDefaultListeningIOReactor {
59
60 private static IOEventDispatch createIOEventDispatch() {
61 final HttpProcessor httpproc = new ImmutableHttpProcessor(
62 new ResponseDate(),
63 new ResponseServer(),
64 new ResponseContent(),
65 new ResponseConnControl());
66 final HttpAsyncService serviceHandler = new HttpAsyncService(httpproc,
67 new UriHttpAsyncRequestHandlerMapper());
68 return new DefaultHttpServerIODispatch(serviceHandler, ConnectionConfig.DEFAULT);
69 }
70
71 @Test
72 public void testEndpointUpAndDown() throws Exception {
73 final IOEventDispatch eventDispatch = createIOEventDispatch();
74 final IOReactorConfig config = IOReactorConfig.custom().setIoThreadCount(1).build();
75 final ListeningIOReactor ioReactor = new DefaultListeningIOReactor(config);
76
77 final Thread t = new Thread(new Runnable() {
78
79 @Override
80 public void run() {
81 try {
82 ioReactor.execute(eventDispatch);
83 } catch (final IOException ex) {
84 }
85 }
86
87 });
88
89 t.start();
90
91 Set<ListenerEndpoint> endpoints = ioReactor.getEndpoints();
92 Assert.assertNotNull(endpoints);
93 Assert.assertEquals(0, endpoints.size());
94
95 final ListenerEndpoint endpoint1 = ioReactor.listen(new InetSocketAddress(0));
96 endpoint1.waitFor();
97
98 final ListenerEndpoint endpoint2 = ioReactor.listen(new InetSocketAddress(0));
99 endpoint2.waitFor();
100 final int port = ((InetSocketAddress) endpoint2.getAddress()).getPort();
101
102 endpoints = ioReactor.getEndpoints();
103 Assert.assertNotNull(endpoints);
104 Assert.assertEquals(2, endpoints.size());
105
106 endpoint1.close();
107
108 endpoints = ioReactor.getEndpoints();
109 Assert.assertNotNull(endpoints);
110 Assert.assertEquals(1, endpoints.size());
111
112 final ListenerEndpoint endpoint = endpoints.iterator().next();
113
114 Assert.assertEquals(port, ((InetSocketAddress) endpoint.getAddress()).getPort());
115
116 ioReactor.shutdown(1000);
117 t.join(1000);
118
119 Assert.assertEquals(IOReactorStatus.SHUT_DOWN, ioReactor.getStatus());
120 }
121
122 @Test
123 public void testEndpointAlreadyBoundFatal() throws Exception {
124 final IOEventDispatch eventDispatch = createIOEventDispatch();
125 final IOReactorConfig config = IOReactorConfig.custom().setIoThreadCount(1).build();
126 final ListeningIOReactor ioReactor = new DefaultListeningIOReactor(config);
127
128 final CountDownLatch latch = new CountDownLatch(1);
129
130 final Thread t = new Thread(new Runnable() {
131
132 @Override
133 public void run() {
134 try {
135 ioReactor.execute(eventDispatch);
136 Assert.fail("IOException should have been thrown");
137 } catch (final IOException ex) {
138 latch.countDown();
139 }
140 }
141
142 });
143
144 t.start();
145
146 final ListenerEndpoint endpoint1 = ioReactor.listen(new InetSocketAddress(0));
147 endpoint1.waitFor();
148 final int port = ((InetSocketAddress) endpoint1.getAddress()).getPort();
149
150 final ListenerEndpoint endpoint2 = ioReactor.listen(new InetSocketAddress(port));
151 endpoint2.waitFor();
152 Assert.assertNotNull(endpoint2.getException());
153
154
155 latch.await(2000, TimeUnit.MILLISECONDS);
156 Assert.assertTrue(ioReactor.getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0);
157
158 final Set<ListenerEndpoint> endpoints = ioReactor.getEndpoints();
159 Assert.assertNotNull(endpoints);
160 Assert.assertEquals(0, endpoints.size());
161
162 ioReactor.shutdown(1000);
163 t.join(1000);
164
165 Assert.assertEquals(IOReactorStatus.SHUT_DOWN, ioReactor.getStatus());
166 }
167
168 @Test
169 public void testEndpointAlreadyBoundNonFatal() throws Exception {
170 final IOEventDispatch eventDispatch = createIOEventDispatch();
171 final IOReactorConfig config = IOReactorConfig.custom().setIoThreadCount(1).build();
172 final DefaultListeningIOReactor ioReactor = new DefaultListeningIOReactor(config);
173
174 ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
175
176 @Override
177 public boolean handle(final IOException ex) {
178 return (ex instanceof BindException);
179 }
180
181 @Override
182 public boolean handle(final RuntimeException ex) {
183 return false;
184 }
185
186 });
187
188 final Thread t = new Thread(new Runnable() {
189
190 @Override
191 public void run() {
192 try {
193 ioReactor.execute(eventDispatch);
194 } catch (final IOException ex) {
195 }
196 }
197
198 });
199
200 t.start();
201
202 final ListenerEndpoint endpoint1 = ioReactor.listen(new InetSocketAddress(9999));
203 endpoint1.waitFor();
204
205 final ListenerEndpoint endpoint2 = ioReactor.listen(new InetSocketAddress(9999));
206 endpoint2.waitFor();
207 Assert.assertNotNull(endpoint2.getException());
208
209
210 Thread.sleep(500);
211
212 Assert.assertEquals(IOReactorStatus.ACTIVE, ioReactor.getStatus());
213
214 ioReactor.shutdown(1000);
215 t.join(1000);
216
217 Assert.assertEquals(IOReactorStatus.SHUT_DOWN, ioReactor.getStatus());
218 }
219
220 }