1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.http.EntityDetails;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpConnection;
40 import org.apache.hc.core5.http.HttpException;
41 import org.apache.hc.core5.http.HttpRequest;
42 import org.apache.hc.core5.http.HttpResponse;
43 import org.apache.hc.core5.http.HttpStatus;
44 import org.apache.hc.core5.http.URIScheme;
45 import org.apache.hc.core5.http.impl.Http1StreamListener;
46 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
47 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
48 import org.apache.hc.core5.http.message.BasicHttpResponse;
49 import org.apache.hc.core5.http.message.RequestLine;
50 import org.apache.hc.core5.http.message.StatusLine;
51 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
52 import org.apache.hc.core5.http.nio.CapacityChannel;
53 import org.apache.hc.core5.http.nio.DataStreamChannel;
54 import org.apache.hc.core5.http.nio.ResponseChannel;
55 import org.apache.hc.core5.http.protocol.HttpContext;
56 import org.apache.hc.core5.io.CloseMode;
57 import org.apache.hc.core5.reactor.IOReactorConfig;
58 import org.apache.hc.core5.reactor.ListenerEndpoint;
59 import org.apache.hc.core5.util.TimeValue;
60
61
62
63
64 public class AsyncFullDuplexServerExample {
65
66 public static void main(final String[] args) throws Exception {
67 int port = 8080;
68 if (args.length >= 1) {
69 port = Integer.parseInt(args[0]);
70 }
71
72 final IOReactorConfig config = IOReactorConfig.custom()
73 .setSoTimeout(15, TimeUnit.SECONDS)
74 .setTcpNoDelay(true)
75 .build();
76
77 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
78 .setExceptionCallback(e -> e.printStackTrace())
79 .setIOReactorConfig(config)
80 .setStreamListener(new Http1StreamListener() {
81
82 @Override
83 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
84 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
85 }
86
87 @Override
88 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
89 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
90 }
91
92 @Override
93 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
94 if (keepAlive) {
95 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
96 } else {
97 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
98 }
99 }
100
101 })
102 .register("/echo", () -> new AsyncServerExchangeHandler() {
103
104 ByteBuffer buffer = ByteBuffer.allocate(2048);
105 CapacityChannel inputCapacityChannel;
106 DataStreamChannel outputDataChannel;
107 boolean endStream;
108
109 private void ensureCapacity(final int chunk) {
110 if (buffer.remaining() < chunk) {
111 final ByteBuffer oldBuffer = buffer;
112 oldBuffer.flip();
113 buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
114 buffer.put(oldBuffer);
115 }
116 }
117
118 @Override
119 public void handleRequest(
120 final HttpRequest request,
121 final EntityDetails entityDetails,
122 final ResponseChannel responseChannel,
123 final HttpContext context) throws HttpException, IOException {
124 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
125 responseChannel.sendResponse(response, entityDetails, context);
126 }
127
128 @Override
129 public void consume(final ByteBuffer src) throws IOException {
130 if (buffer.position() == 0) {
131 if (outputDataChannel != null) {
132 outputDataChannel.write(src);
133 }
134 }
135 if (src.hasRemaining()) {
136 ensureCapacity(src.remaining());
137 buffer.put(src);
138 if (outputDataChannel != null) {
139 outputDataChannel.requestOutput();
140 }
141 }
142 }
143
144 @Override
145 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
146 if (buffer.hasRemaining()) {
147 capacityChannel.update(buffer.remaining());
148 inputCapacityChannel = null;
149 } else {
150 inputCapacityChannel = capacityChannel;
151 }
152 }
153
154 @Override
155 public void streamEnd(final List<? extends Header> trailers) throws IOException {
156 endStream = true;
157 if (buffer.position() == 0) {
158 if (outputDataChannel != null) {
159 outputDataChannel.endStream();
160 }
161 } else {
162 if (outputDataChannel != null) {
163 outputDataChannel.requestOutput();
164 }
165 }
166 }
167
168 @Override
169 public int available() {
170 return buffer.position();
171 }
172
173 @Override
174 public void produce(final DataStreamChannel channel) throws IOException {
175 outputDataChannel = channel;
176 buffer.flip();
177 if (buffer.hasRemaining()) {
178 channel.write(buffer);
179 }
180 buffer.compact();
181 if (buffer.position() == 0 && endStream) {
182 channel.endStream();
183 }
184 final CapacityChannel capacityChannel = inputCapacityChannel;
185 if (capacityChannel != null && buffer.hasRemaining()) {
186 capacityChannel.update(buffer.remaining());
187 }
188 }
189
190 @Override
191 public void failed(final Exception cause) {
192 if (!(cause instanceof SocketException)) {
193 cause.printStackTrace(System.out);
194 }
195 }
196
197 @Override
198 public void releaseResources() {
199 }
200
201 })
202 .create();
203
204 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
205 System.out.println("HTTP server shutting down");
206 server.close(CloseMode.GRACEFUL);
207 }));
208
209 server.start();
210 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
211 final ListenerEndpoint listenerEndpoint = future.get();
212 System.out.print("Listening on " + listenerEndpoint.getAddress());
213 server.awaitShutdown(TimeValue.MAX_VALUE);
214 }
215
216 }