1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.UnsupportedCharsetException;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.core5.concurrent.CallbackContribution;
36 import org.apache.hc.core5.concurrent.FutureCallback;
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.ContentType;
39 import org.apache.hc.core5.http.EntityDetails;
40 import org.apache.hc.core5.http.Header;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpResponse;
43 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
44 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
45 import org.apache.hc.core5.http.nio.CapacityChannel;
46 import org.apache.hc.core5.http.protocol.HttpContext;
47 import org.apache.hc.core5.util.Args;
48
49
50
51
52
53
54
55
56
57
58 public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
59
60 private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
61 private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
62
63 public AbstractAsyncResponseConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
64 this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
65 this.dataConsumerRef = new AtomicReference<>(null);
66 }
67
68 public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> dataConsumer) {
69 this(new Supplier<AsyncEntityConsumer<E>>() {
70
71 @Override
72 public AsyncEntityConsumer<E> get() {
73 return dataConsumer;
74 }
75
76 });
77 }
78
79
80
81
82
83
84
85
86 protected abstract T buildResult(HttpResponse response, E entity, ContentType contentType);
87
88 @Override
89 public final void consumeResponse(
90 final HttpResponse response,
91 final EntityDetails entityDetails,
92 final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
93 if (entityDetails != null) {
94 final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
95 if (dataConsumer == null) {
96 throw new HttpException("Supplied data consumer is null");
97 }
98 dataConsumerRef.set(dataConsumer);
99 dataConsumer.streamStart(entityDetails, new CallbackContribution<E>(resultCallback) {
100
101 @Override
102 public void completed(final E entity) {
103 final ContentType contentType;
104 try {
105 contentType = ContentType.parse(entityDetails.getContentType());
106 final T result = buildResult(response, entity, contentType);
107 if (resultCallback != null) {
108 resultCallback.completed(result);
109 }
110 } catch (final UnsupportedCharsetException ex) {
111 if (resultCallback != null) {
112 resultCallback.failed(ex);
113 }
114 }
115 }
116
117 });
118 } else {
119 final T result = buildResult(response, null, null);
120 if (resultCallback != null) {
121 resultCallback.completed(result);
122 }
123 }
124
125 }
126
127 @Override
128 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
129 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
130 if (dataConsumer != null) {
131 dataConsumer.updateCapacity(capacityChannel);
132 } else {
133 capacityChannel.update(Integer.MAX_VALUE);
134 }
135 }
136
137 @Override
138 public final void consume(final ByteBuffer src) throws IOException {
139 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
140 if (dataConsumer != null) {
141 dataConsumer.consume(src);
142 }
143 }
144
145 @Override
146 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
147 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
148 if (dataConsumer != null) {
149 dataConsumer.streamEnd(trailers);
150 }
151 }
152
153 @Override
154 public final void failed(final Exception cause) {
155 releaseResources();
156 }
157
158 @Override
159 public final void releaseResources() {
160 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
161 if (dataConsumer != null) {
162 dataConsumer.releaseResources();
163 }
164 }
165
166 }