View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.concurrent.atomic.AtomicLong;
37  import java.util.concurrent.locks.ReentrantLock;
38  
39  import org.apache.hc.core5.annotation.Contract;
40  import org.apache.hc.core5.annotation.ThreadingBehavior;
41  import org.apache.hc.core5.http.Header;
42  import org.apache.hc.core5.http.HttpStreamResetException;
43  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
44  import org.apache.hc.core5.http.nio.CapacityChannel;
45  import org.apache.hc.core5.util.Args;
46  import org.reactivestreams.Publisher;
47  import org.reactivestreams.Subscriber;
48  import org.reactivestreams.Subscription;
49  
50  /**
51   * An asynchronous data consumer that supports Reactive Streams.
52   *
53   * @since 5.0
54   */
55  @Contract(threading = ThreadingBehavior.SAFE)
56  final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
57  
58      private final AtomicLong requests = new AtomicLong(0);
59  
60      private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
61      private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
62      private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
63      private volatile boolean cancelled;
64      private volatile boolean completed;
65      private volatile Exception exception;
66      private volatile CapacityChannel capacityChannel;
67      private volatile Subscriber<? super ByteBuffer> subscriber;
68  
69      private final ReentrantLock lock = new ReentrantLock();
70  
71      public void failed(final Exception cause) {
72          if (!completed) {
73              exception = cause;
74              flushToSubscriber();
75          }
76      }
77  
78      @Override
79      public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
80          throwIfCancelled();
81          this.capacityChannel = capacityChannel;
82          signalCapacity(capacityChannel);
83      }
84  
85      private void signalCapacity(final CapacityChannel channel) throws IOException {
86          final int increment = windowScalingIncrement.getAndSet(0);
87          if (increment > 0) {
88              channel.update(increment);
89          }
90      }
91  
92      private void throwIfCancelled() throws IOException {
93          if (cancelled) {
94              throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
95          }
96      }
97  
98      @Override
99      public void consume(final ByteBuffer byteBuffer) throws IOException {
100         if (completed) {
101             throw new IllegalStateException("Received data past end of stream");
102         }
103         throwIfCancelled();
104 
105         final byte[] copy = new byte[byteBuffer.remaining()];
106         byteBuffer.get(copy);
107         buffers.add(ByteBuffer.wrap(copy));
108 
109         flushToSubscriber();
110     }
111 
112     @Override
113     public void streamEnd(final List<? extends Header> trailers) {
114         completed = true;
115         flushToSubscriber();
116     }
117 
118     @Override
119     public void releaseResources() {
120         this.capacityChannel = null;
121     }
122 
123     private void flushToSubscriber() {
124         lock.lock();
125         try {
126             final Subscriber<? super ByteBuffer> s = subscriber;
127             if (flushInProgress.getAndSet(true)) {
128                 return;
129             }
130             try {
131                 if (s == null) {
132                     return;
133                 }
134                 if (exception != null) {
135                     subscriber = null;
136                     s.onError(exception);
137                     return;
138                 }
139                 ByteBuffer next;
140                 while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
141                     final int bytesFreed = next.remaining();
142                     s.onNext(next);
143                     requests.decrementAndGet();
144                     windowScalingIncrement.addAndGet(bytesFreed);
145                 }
146                 final CapacityChannel localChannel = capacityChannel;
147                 if (localChannel != null) {
148                     try {
149                         signalCapacity(localChannel);
150                     } catch (final IOException e) {
151                         exception = e;
152                         s.onError(e);
153                         return;
154                     }
155                 }
156                 if (completed && buffers.isEmpty()) {
157                     subscriber = null;
158                     s.onComplete();
159                 }
160             } finally {
161                 flushInProgress.set(false);
162             }
163         } finally {
164             lock.unlock();
165         }
166     }
167 
168     @Override
169     public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
170         this.subscriber = Args.notNull(subscriber, "subscriber");
171         subscriber.onSubscribe(new Subscription() {
172             @Override
173             public void request(final long increment) {
174                 if (increment <= 0) {
175                     failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
176                     return;
177                 }
178                 requests.addAndGet(increment);
179                 flushToSubscriber();
180             }
181 
182             @Override
183             public void cancel() {
184                 ReactiveDataConsumer.this.cancelled = true;
185                 ReactiveDataConsumer.this.subscriber = null;
186             }
187         });
188     }
189 }