1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayDeque;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.core5.annotation.Contract;
36 import org.apache.hc.core5.annotation.ThreadingBehavior;
37 import org.apache.hc.core5.http.HttpStreamResetException;
38 import org.apache.hc.core5.http.nio.AsyncDataProducer;
39 import org.apache.hc.core5.http.nio.DataStreamChannel;
40 import org.apache.hc.core5.util.Args;
41 import org.reactivestreams.Publisher;
42 import org.reactivestreams.Subscriber;
43 import org.reactivestreams.Subscription;
44
45
46
47
48
49
50 @Contract(threading = ThreadingBehavior.SAFE)
51 final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
52
53 private static final int BUFFER_WINDOW_SIZE = 5;
54
55 private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
56 private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
57 private final AtomicBoolean complete = new AtomicBoolean(false);
58 private final Publisher<ByteBuffer> publisher;
59 private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
60 private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>();
61
62 public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
63 this.publisher = Args.notNull(publisher, "publisher");
64 }
65
66 void setChannel(final DataStreamChannel channel) {
67 requestChannel.set(channel);
68 }
69
70 @Override
71 public void onSubscribe(final Subscription subscription) {
72 if (this.subscription.getAndSet(subscription) != null) {
73 throw new IllegalStateException("Already subscribed");
74 }
75
76 subscription.request(BUFFER_WINDOW_SIZE);
77 }
78
79 @Override
80 public void onNext(final ByteBuffer byteBuffer) {
81 final byte[] copy = new byte[byteBuffer.remaining()];
82 byteBuffer.get(copy);
83 synchronized (buffers) {
84 buffers.add(ByteBuffer.wrap(copy));
85 }
86 signalReadiness();
87 }
88
89 @Override
90 public void onError(final Throwable throwable) {
91 subscription.set(null);
92 exception.set(throwable);
93 signalReadiness();
94 }
95
96 @Override
97 public void onComplete() {
98 subscription.set(null);
99 complete.set(true);
100 signalReadiness();
101 }
102
103 private void signalReadiness() {
104 final DataStreamChannel channel = requestChannel.get();
105 if (channel == null) {
106 throw new IllegalStateException("Output channel is not set");
107 }
108 channel.requestOutput();
109 }
110
111 @Override
112 public int available() {
113 if (exception.get() != null || complete.get()) {
114 return 1;
115 } else {
116 synchronized (buffers) {
117 int sum = 0;
118 for (final ByteBuffer buffer : buffers) {
119 sum += buffer.remaining();
120 }
121 return sum;
122 }
123 }
124 }
125
126 @Override
127 public void produce(final DataStreamChannel channel) throws IOException {
128 if (requestChannel.get() == null) {
129 requestChannel.set(channel);
130 publisher.subscribe(this);
131 }
132
133 final Throwable t = exception.get();
134 final Subscription s = subscription.get();
135 int buffersToReplenish = 0;
136 try {
137 synchronized (buffers) {
138 if (t != null) {
139 throw new HttpStreamResetException(t.getMessage(), t);
140 } else if (this.complete.get() && buffers.isEmpty()) {
141 channel.endStream();
142 } else {
143 while (!buffers.isEmpty()) {
144 final ByteBuffer nextBuffer = buffers.remove();
145 channel.write(nextBuffer);
146 if (nextBuffer.remaining() > 0) {
147 buffers.push(nextBuffer);
148 break;
149 } else if (s != null) {
150
151 buffersToReplenish++;
152 }
153 }
154 }
155 }
156 } finally {
157 if (s != null && buffersToReplenish > 0) {
158 s.request(buffersToReplenish);
159 }
160 }
161 }
162
163 @Override
164 public void releaseResources() {
165 final Subscription s = subscription.getAndSet(null);
166 if (s != null) {
167 s.cancel();
168 }
169 }
170 }