View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support.classic;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.OutputStream;
32  import java.nio.ByteBuffer;
33  import java.util.Collections;
34  import java.util.List;
35  import java.util.Locale;
36  import java.util.Set;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  import org.apache.hc.core5.http.EntityDetails;
42  import org.apache.hc.core5.http.Header;
43  import org.apache.hc.core5.http.HttpException;
44  import org.apache.hc.core5.http.HttpHeaders;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.ProtocolVersion;
49  import org.apache.hc.core5.http.message.BasicHttpResponse;
50  import org.apache.hc.core5.http.message.HttpResponseWrapper;
51  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
52  import org.apache.hc.core5.http.nio.CapacityChannel;
53  import org.apache.hc.core5.http.nio.DataStreamChannel;
54  import org.apache.hc.core5.http.nio.ResponseChannel;
55  import org.apache.hc.core5.http.protocol.HttpContext;
56  import org.apache.hc.core5.io.Closer;
57  import org.apache.hc.core5.util.Args;
58  import org.apache.hc.core5.util.Asserts;
59  
60  /**
61   * {@link AsyncServerExchangeHandler} implementation that acts as a compatibility
62   * layer for classic {@link InputStream} / {@link OutputStream} based interfaces.
63   * Blocking input / output processing is executed through an {@link Executor}.
64   *
65   * @since 5.0
66   */
67  public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
68  
69      private enum State { IDLE, ACTIVE, COMPLETED }
70  
71      private final int initialBufferSize;
72      private final Executor executor;
73      private final AtomicReference<State> state;
74      private final AtomicReference<Exception> exception;
75  
76      private volatile SharedInputBuffer inputBuffer;
77      private volatile SharedOutputBuffer outputBuffer;
78  
79      public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
80          this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
81          this.executor = Args.notNull(executor, "Executor");
82          this.exception = new AtomicReference<>();
83          this.state = new AtomicReference<>(State.IDLE);
84      }
85  
86      /**
87       * Handles an incoming request optionally reading its entity content form the given input stream
88       * and generates a response optionally writing out its entity content into the given output stream.
89       *
90       * @param request the incoming request
91       * @param requestStream the request stream if the request encloses an entity,
92       *                      {@code null} otherwise.
93       * @param response the outgoing response.
94       * @param responseStream the response entity output stream.
95       * @param context the actual execution context.
96       */
97      protected abstract void handle(
98              HttpRequest request, InputStream requestStream,
99              HttpResponse response, OutputStream responseStream,
100             HttpContext context) throws IOException, HttpException;
101 
102     public Exception getException() {
103         return exception.get();
104     }
105 
106     @Override
107     public final void handleRequest(
108             final HttpRequest request,
109             final EntityDetails entityDetails,
110             final ResponseChannel responseChannel,
111             final HttpContext context) throws HttpException, IOException {
112         final AtomicBoolean responseCommitted = new AtomicBoolean(false);
113 
114         final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
115         final HttpResponse responseWrapper = new HttpResponseWrapper(response){
116 
117             private void ensureNotCommitted() {
118                 Asserts.check(!responseCommitted.get(), "Response already committed");
119             }
120 
121             @Override
122             public void addHeader(final String name, final Object value) {
123                 ensureNotCommitted();
124                 super.addHeader(name, value);
125             }
126 
127             @Override
128             public void setHeader(final String name, final Object value) {
129                 ensureNotCommitted();
130                 super.setHeader(name, value);
131             }
132 
133             @Override
134             public void setVersion(final ProtocolVersion version) {
135                 ensureNotCommitted();
136                 super.setVersion(version);
137             }
138 
139             @Override
140             public void setCode(final int code) {
141                 ensureNotCommitted();
142                 super.setCode(code);
143             }
144 
145             @Override
146             public void setReasonPhrase(final String reason) {
147                 ensureNotCommitted();
148                 super.setReasonPhrase(reason);
149             }
150 
151             @Override
152             public void setLocale(final Locale locale) {
153                 ensureNotCommitted();
154                 super.setLocale(locale);
155             }
156 
157         };
158 
159         final InputStream inputStream;
160         if (entityDetails != null) {
161             inputBuffer = new SharedInputBuffer(initialBufferSize);
162             inputStream = new ContentInputStream(inputBuffer);
163         } else {
164             inputStream = null;
165         }
166         outputBuffer = new SharedOutputBuffer(initialBufferSize);
167 
168         final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
169 
170             private void triggerResponse() throws IOException {
171                 try {
172                     if (responseCommitted.compareAndSet(false, true)) {
173                         responseChannel.sendResponse(response, new EntityDetails() {
174 
175                             @Override
176                             public long getContentLength() {
177                                 return -1;
178                             }
179 
180                             @Override
181                             public String getContentType() {
182                                 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
183                                 return h != null ? h.getValue() : null;
184                             }
185 
186                             @Override
187                             public String getContentEncoding() {
188                                 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
189                                 return h != null ? h.getValue() : null;
190                             }
191 
192                             @Override
193                             public boolean isChunked() {
194                                 return false;
195                             }
196 
197                             @Override
198                             public Set<String> getTrailerNames() {
199                                 return Collections.emptySet();
200                             }
201 
202                         }, context);
203                     }
204                 } catch (final HttpException ex) {
205                     throw new IOException(ex.getMessage(), ex);
206                 }
207             }
208 
209             @Override
210             public void close() throws IOException {
211                 triggerResponse();
212                 super.close();
213             }
214 
215             @Override
216             public void write(final byte[] b, final int off, final int len) throws IOException {
217                 triggerResponse();
218                 super.write(b, off, len);
219             }
220 
221             @Override
222             public void write(final byte[] b) throws IOException {
223                 triggerResponse();
224                 super.write(b);
225             }
226 
227             @Override
228             public void write(final int b) throws IOException {
229                 triggerResponse();
230                 super.write(b);
231             }
232 
233         };
234 
235         if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
236             executor.execute(() -> {
237                 try {
238                     handle(request, inputStream, responseWrapper, outputStream, context);
239                     Closer.close(inputStream);
240                     outputStream.close();
241                 } catch (final Exception ex) {
242                     exception.compareAndSet(null, ex);
243                     if (inputBuffer != null) {
244                         inputBuffer.abort();
245                     }
246                     outputBuffer.abort();
247                 } finally {
248                     state.set(State.COMPLETED);
249                 }
250             });
251         }
252     }
253 
254     @Override
255     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
256         if (inputBuffer != null) {
257             inputBuffer.updateCapacity(capacityChannel);
258         }
259     }
260 
261     @Override
262     public final void consume(final ByteBuffer src) throws IOException {
263         Asserts.notNull(inputBuffer, "Input buffer");
264         inputBuffer.fill(src);
265     }
266 
267     @Override
268     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
269         Asserts.notNull(inputBuffer, "Input buffer");
270         inputBuffer.markEndStream();
271     }
272 
273     @Override
274     public final int available() {
275         Asserts.notNull(outputBuffer, "Output buffer");
276         return outputBuffer.length();
277     }
278 
279     @Override
280     public final void produce(final DataStreamChannel channel) throws IOException {
281         Asserts.notNull(outputBuffer, "Output buffer");
282         outputBuffer.flush(channel);
283     }
284 
285     @Override
286     public final void failed(final Exception cause) {
287         exception.compareAndSet(null, cause);
288         releaseResources();
289     }
290 
291     @Override
292     public void releaseResources() {
293     }
294 
295 }