1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.OutputStream;
32 import java.nio.ByteBuffer;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Locale;
36 import java.util.Set;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicReference;
40
41 import org.apache.hc.core5.http.EntityDetails;
42 import org.apache.hc.core5.http.Header;
43 import org.apache.hc.core5.http.HttpException;
44 import org.apache.hc.core5.http.HttpHeaders;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.ProtocolVersion;
49 import org.apache.hc.core5.http.message.BasicHttpResponse;
50 import org.apache.hc.core5.http.message.HttpResponseWrapper;
51 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
52 import org.apache.hc.core5.http.nio.CapacityChannel;
53 import org.apache.hc.core5.http.nio.DataStreamChannel;
54 import org.apache.hc.core5.http.nio.ResponseChannel;
55 import org.apache.hc.core5.http.protocol.HttpContext;
56 import org.apache.hc.core5.io.Closer;
57 import org.apache.hc.core5.util.Args;
58 import org.apache.hc.core5.util.Asserts;
59
60
61
62
63
64
65
66
67 public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
68
69 private enum State { IDLE, ACTIVE, COMPLETED }
70
71 private final int initialBufferSize;
72 private final Executor executor;
73 private final AtomicReference<State> state;
74 private final AtomicReference<Exception> exception;
75
76 private volatile SharedInputBuffer inputBuffer;
77 private volatile SharedOutputBuffer outputBuffer;
78
79 public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
80 this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
81 this.executor = Args.notNull(executor, "Executor");
82 this.exception = new AtomicReference<>();
83 this.state = new AtomicReference<>(State.IDLE);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97 protected abstract void handle(
98 HttpRequest request, InputStream requestStream,
99 HttpResponse response, OutputStream responseStream,
100 HttpContext context) throws IOException, HttpException;
101
102 public Exception getException() {
103 return exception.get();
104 }
105
106 @Override
107 public final void handleRequest(
108 final HttpRequest request,
109 final EntityDetails entityDetails,
110 final ResponseChannel responseChannel,
111 final HttpContext context) throws HttpException, IOException {
112 final AtomicBoolean responseCommitted = new AtomicBoolean(false);
113
114 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
115 final HttpResponse responseWrapper = new HttpResponseWrapper(response){
116
117 private void ensureNotCommitted() {
118 Asserts.check(!responseCommitted.get(), "Response already committed");
119 }
120
121 @Override
122 public void addHeader(final String name, final Object value) {
123 ensureNotCommitted();
124 super.addHeader(name, value);
125 }
126
127 @Override
128 public void setHeader(final String name, final Object value) {
129 ensureNotCommitted();
130 super.setHeader(name, value);
131 }
132
133 @Override
134 public void setVersion(final ProtocolVersion version) {
135 ensureNotCommitted();
136 super.setVersion(version);
137 }
138
139 @Override
140 public void setCode(final int code) {
141 ensureNotCommitted();
142 super.setCode(code);
143 }
144
145 @Override
146 public void setReasonPhrase(final String reason) {
147 ensureNotCommitted();
148 super.setReasonPhrase(reason);
149 }
150
151 @Override
152 public void setLocale(final Locale locale) {
153 ensureNotCommitted();
154 super.setLocale(locale);
155 }
156
157 };
158
159 final InputStream inputStream;
160 if (entityDetails != null) {
161 inputBuffer = new SharedInputBuffer(initialBufferSize);
162 inputStream = new ContentInputStream(inputBuffer);
163 } else {
164 inputStream = null;
165 }
166 outputBuffer = new SharedOutputBuffer(initialBufferSize);
167
168 final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
169
170 private void triggerResponse() throws IOException {
171 try {
172 if (responseCommitted.compareAndSet(false, true)) {
173 responseChannel.sendResponse(response, new EntityDetails() {
174
175 @Override
176 public long getContentLength() {
177 return -1;
178 }
179
180 @Override
181 public String getContentType() {
182 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
183 return h != null ? h.getValue() : null;
184 }
185
186 @Override
187 public String getContentEncoding() {
188 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
189 return h != null ? h.getValue() : null;
190 }
191
192 @Override
193 public boolean isChunked() {
194 return false;
195 }
196
197 @Override
198 public Set<String> getTrailerNames() {
199 return Collections.emptySet();
200 }
201
202 }, context);
203 }
204 } catch (final HttpException ex) {
205 throw new IOException(ex.getMessage(), ex);
206 }
207 }
208
209 @Override
210 public void close() throws IOException {
211 triggerResponse();
212 super.close();
213 }
214
215 @Override
216 public void write(final byte[] b, final int off, final int len) throws IOException {
217 triggerResponse();
218 super.write(b, off, len);
219 }
220
221 @Override
222 public void write(final byte[] b) throws IOException {
223 triggerResponse();
224 super.write(b);
225 }
226
227 @Override
228 public void write(final int b) throws IOException {
229 triggerResponse();
230 super.write(b);
231 }
232
233 };
234
235 if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
236 executor.execute(() -> {
237 try {
238 handle(request, inputStream, responseWrapper, outputStream, context);
239 Closer.close(inputStream);
240 outputStream.close();
241 } catch (final Exception ex) {
242 exception.compareAndSet(null, ex);
243 if (inputBuffer != null) {
244 inputBuffer.abort();
245 }
246 outputBuffer.abort();
247 } finally {
248 state.set(State.COMPLETED);
249 }
250 });
251 }
252 }
253
254 @Override
255 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
256 if (inputBuffer != null) {
257 inputBuffer.updateCapacity(capacityChannel);
258 }
259 }
260
261 @Override
262 public final void consume(final ByteBuffer src) throws IOException {
263 Asserts.notNull(inputBuffer, "Input buffer");
264 inputBuffer.fill(src);
265 }
266
267 @Override
268 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
269 Asserts.notNull(inputBuffer, "Input buffer");
270 inputBuffer.markEndStream();
271 }
272
273 @Override
274 public final int available() {
275 Asserts.notNull(outputBuffer, "Output buffer");
276 return outputBuffer.length();
277 }
278
279 @Override
280 public final void produce(final DataStreamChannel channel) throws IOException {
281 Asserts.notNull(outputBuffer, "Output buffer");
282 outputBuffer.flush(channel);
283 }
284
285 @Override
286 public final void failed(final Exception cause) {
287 exception.compareAndSet(null, cause);
288 releaseResources();
289 }
290
291 @Override
292 public void releaseResources() {
293 }
294
295 }