1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.locks.ReentrantLock;
38
39 import org.apache.hc.core5.annotation.Contract;
40 import org.apache.hc.core5.annotation.ThreadingBehavior;
41 import org.apache.hc.core5.http.Header;
42 import org.apache.hc.core5.http.HttpStreamResetException;
43 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
44 import org.apache.hc.core5.http.nio.CapacityChannel;
45 import org.apache.hc.core5.util.Args;
46 import org.reactivestreams.Publisher;
47 import org.reactivestreams.Subscriber;
48 import org.reactivestreams.Subscription;
49
50
51
52
53
54
55 @Contract(threading = ThreadingBehavior.SAFE)
56 final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
57
58 private final AtomicLong requests = new AtomicLong(0);
59
60 private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
61 private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
62 private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
63 private volatile boolean cancelled;
64 private volatile boolean completed;
65 private volatile Exception exception;
66 private volatile CapacityChannel capacityChannel;
67 private volatile Subscriber<? super ByteBuffer> subscriber;
68
69 private final ReentrantLock lock = new ReentrantLock();
70
71 public void failed(final Exception cause) {
72 if (!completed) {
73 exception = cause;
74 flushToSubscriber();
75 }
76 }
77
78 @Override
79 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
80 throwIfCancelled();
81 this.capacityChannel = capacityChannel;
82 signalCapacity(capacityChannel);
83 }
84
85 private void signalCapacity(final CapacityChannel channel) throws IOException {
86 final int increment = windowScalingIncrement.getAndSet(0);
87 if (increment > 0) {
88 channel.update(increment);
89 }
90 }
91
92 private void throwIfCancelled() throws IOException {
93 if (cancelled) {
94 throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
95 }
96 }
97
98 @Override
99 public void consume(final ByteBuffer byteBuffer) throws IOException {
100 if (completed) {
101 throw new IllegalStateException("Received data past end of stream");
102 }
103 throwIfCancelled();
104
105 final byte[] copy = new byte[byteBuffer.remaining()];
106 byteBuffer.get(copy);
107 buffers.add(ByteBuffer.wrap(copy));
108
109 flushToSubscriber();
110 }
111
112 @Override
113 public void streamEnd(final List<? extends Header> trailers) {
114 completed = true;
115 flushToSubscriber();
116 }
117
118 @Override
119 public void releaseResources() {
120 this.capacityChannel = null;
121 }
122
123 private void flushToSubscriber() {
124 lock.lock();
125 try {
126 final Subscriber<? super ByteBuffer> s = subscriber;
127 if (flushInProgress.getAndSet(true)) {
128 return;
129 }
130 try {
131 if (s == null) {
132 return;
133 }
134 if (exception != null) {
135 subscriber = null;
136 s.onError(exception);
137 return;
138 }
139 ByteBuffer next;
140 while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
141 final int bytesFreed = next.remaining();
142 s.onNext(next);
143 requests.decrementAndGet();
144 windowScalingIncrement.addAndGet(bytesFreed);
145 }
146 final CapacityChannel localChannel = capacityChannel;
147 if (localChannel != null) {
148 try {
149 signalCapacity(localChannel);
150 } catch (final IOException e) {
151 exception = e;
152 s.onError(e);
153 return;
154 }
155 }
156 if (completed && buffers.isEmpty()) {
157 subscriber = null;
158 s.onComplete();
159 }
160 } finally {
161 flushInProgress.set(false);
162 }
163 } finally {
164 lock.unlock();
165 }
166 }
167
168 @Override
169 public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
170 this.subscriber = Args.notNull(subscriber, "subscriber");
171 subscriber.onSubscribe(new Subscription() {
172 @Override
173 public void request(final long increment) {
174 if (increment <= 0) {
175 failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
176 return;
177 }
178 requests.addAndGet(increment);
179 flushToSubscriber();
180 }
181
182 @Override
183 public void cancel() {
184 ReactiveDataConsumer.this.cancelled = true;
185 ReactiveDataConsumer.this.subscriber = null;
186 }
187 });
188 }
189 }