1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.concurrent.CallbackContribution;
35 import org.apache.hc.core5.concurrent.FutureCallback;
36 import org.apache.hc.core5.function.Supplier;
37 import org.apache.hc.core5.http.EntityDetails;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpException;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.Message;
42 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
43 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
44 import org.apache.hc.core5.http.nio.CapacityChannel;
45 import org.apache.hc.core5.http.protocol.HttpContext;
46 import org.apache.hc.core5.util.Args;
47
48
49
50
51
52
53
54 public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<HttpRequest, T>> {
55
56 private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
57 private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
58
59 public BasicRequestConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
60 this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
61 this.dataConsumerRef = new AtomicReference<>();
62 }
63
64 public BasicRequestConsumer(final AsyncEntityConsumer<T> dataConsumer) {
65 this(() -> dataConsumer);
66 }
67
68 @Override
69 public void consumeRequest(
70 final HttpRequest request,
71 final EntityDetails entityDetails,
72 final HttpContext httpContext,
73 final FutureCallback<Message<HttpRequest, T>> resultCallback) throws HttpException, IOException {
74 Args.notNull(request, "Request");
75 if (entityDetails != null) {
76 final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
77 if (dataConsumer == null) {
78 throw new HttpException("Supplied data consumer is null");
79 }
80 dataConsumerRef.set(dataConsumer);
81
82 dataConsumer.streamStart(entityDetails, new CallbackContribution<T>(resultCallback) {
83
84 @Override
85 public void completed(final T body) {
86 final Message<HttpRequest, T> result = new Message<>(request, body);
87 if (resultCallback != null) {
88 resultCallback.completed(result);
89 }
90 }
91
92 });
93 } else {
94 final Message<HttpRequest, T> result = new Message<>(request, null);
95 if (resultCallback != null) {
96 resultCallback.completed(result);
97 }
98 }
99 }
100
101 @Override
102 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
103 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
104 dataConsumer.updateCapacity(capacityChannel);
105 }
106
107 @Override
108 public void consume(final ByteBuffer src) throws IOException {
109 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
110 dataConsumer.consume(src);
111 }
112
113 @Override
114 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
115 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
116 dataConsumer.streamEnd(trailers);
117 }
118
119 @Override
120 public void failed(final Exception cause) {
121 releaseResources();
122 }
123
124 @Override
125 public void releaseResources() {
126 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
127 if (dataConsumer != null) {
128 dataConsumer.releaseResources();
129 }
130 }
131
132 }