View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.entity;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.nio.CharBuffer;
32  import java.nio.charset.Charset;
33  import java.nio.charset.CharsetEncoder;
34  import java.nio.charset.CoderResult;
35  import java.nio.charset.StandardCharsets;
36  import java.util.Objects;
37  import java.util.Set;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.hc.core5.annotation.Contract;
41  import org.apache.hc.core5.annotation.ThreadingBehavior;
42  import org.apache.hc.core5.http.ContentType;
43  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
44  import org.apache.hc.core5.http.nio.DataStreamChannel;
45  import org.apache.hc.core5.http.nio.StreamChannel;
46  import org.apache.hc.core5.util.Args;
47  
48  /**
49   * Abstract text entity content producer.
50   *
51   * @since 5.0
52   */
53  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
54  public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
55  
56      private static final CharBuffer EMPTY = CharBuffer.wrap(new char[0]);
57  
58      enum State { ACTIVE, FLUSHING, END_STREAM }
59  
60      private final ByteBuffer bytebuf;
61      private final int fragmentSizeHint;
62      private final ContentType contentType;
63      private final CharsetEncoder charsetEncoder;
64      private final ReentrantLock lock;
65  
66      private volatile State state;
67  
68      public AbstractCharAsyncEntityProducer(
69              final int bufferSize,
70              final int fragmentSizeHint,
71              final ContentType contentType) {
72          Args.positive(bufferSize, "Buffer size");
73          this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
74          this.bytebuf = ByteBuffer.allocate(bufferSize);
75          this.contentType = contentType;
76          final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8);
77          this.charsetEncoder = charset.newEncoder();
78          this.state = State.ACTIVE;
79          this.lock = new ReentrantLock();
80      }
81  
82      private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
83          if (bytebuf.position() > 0) {
84              bytebuf.flip();
85              channel.write(bytebuf);
86              bytebuf.compact();
87          }
88      }
89  
90      final int writeData(final StreamChannel<ByteBuffer> channel, final CharBuffer src) throws IOException {
91  
92          final int chunk = src.remaining();
93          if (chunk == 0) {
94              return 0;
95          }
96  
97          final int p = src.position();
98          final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
99          if (result.isError()) {
100             result.throwException();
101         }
102 
103         if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
104             flush(channel);
105         }
106 
107         return src.position() - p;
108     }
109 
110     final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
111         if (state == State.ACTIVE) {
112             state = State.FLUSHING;
113             if (!bytebuf.hasRemaining()) {
114                 flush(channel);
115             }
116 
117             final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
118             if (result.isError()) {
119                 result.throwException();
120             }
121             final CoderResult result2 = charsetEncoder.flush(bytebuf);
122             if (result2.isError()) {
123                 result.throwException();
124             } else if (result.isUnderflow()) {
125                 flush(channel);
126                 if (bytebuf.position() == 0) {
127                     state = State.END_STREAM;
128                     channel.endStream();
129                 }
130             }
131         }
132 
133     }
134 
135     /**
136      * Returns the number of bytes immediately available for output.
137      * This method can be used as a hint to control output events
138      * of the underlying I/O session.
139      *
140      * @return the number of bytes immediately available for output
141      */
142     protected abstract int availableData();
143 
144     /**
145      * Triggered to signal the ability of the underlying char channel
146      * to accept more data. The data producer can choose to write data
147      * immediately inside the call or asynchronously at some later point.
148      * <p>
149      * {@link StreamChannel} passed to this method is threading-safe.
150      *
151      * @param channel the data channel capable to accepting more data.
152      */
153     protected abstract void produceData(StreamChannel<CharBuffer> channel) throws IOException;
154 
155     @Override
156     public final String getContentType() {
157         return Objects.toString(contentType, null);
158     }
159 
160     @Override
161     public String getContentEncoding() {
162         return null;
163     }
164 
165     @Override
166     public long getContentLength() {
167         return -1;
168     }
169 
170     @Override
171     public boolean isChunked() {
172         return false;
173     }
174 
175     @Override
176     public Set<String> getTrailerNames() {
177         return null;
178     }
179 
180     @Override
181     public final int available() {
182         if (state == State.ACTIVE) {
183             return availableData();
184         } else {
185             lock.lock();
186             try {
187                 return bytebuf.position();
188             } finally {
189                 lock.unlock();
190             }
191         }
192     }
193 
194     @Override
195     public final void produce(final DataStreamChannel channel) throws IOException {
196         lock.lock();
197         try {
198             if (state == State.ACTIVE) {
199                 produceData(new StreamChannel<CharBuffer>() {
200 
201                     @Override
202                     public int write(final CharBuffer src) throws IOException {
203                         Args.notNull(src, "Buffer");
204                         lock.lock();
205                         try {
206                             return writeData(channel, src);
207                         } finally {
208                             lock.unlock();
209                         }
210                     }
211 
212                     @Override
213                     public void endStream() throws IOException {
214                         lock.lock();
215                         try {
216                             streamEnd(channel);
217                         } finally {
218                             lock.unlock();
219                         }
220                     }
221 
222                 });
223             }
224             if (state == State.FLUSHING) {
225                 final CoderResult result = charsetEncoder.flush(bytebuf);
226                 if (result.isError()) {
227                     result.throwException();
228                 } else if (result.isOverflow()) {
229                     flush(channel);
230                 } else if (result.isUnderflow()) {
231                     flush(channel);
232                     if (bytebuf.position() == 0) {
233                         state = State.END_STREAM;
234                         channel.endStream();
235                     }
236                 }
237 
238             }
239 
240         } finally {
241             lock.unlock();
242         }
243     }
244 
245     @Override
246     public void releaseResources() {
247         state = State.ACTIVE;
248         charsetEncoder.reset();
249     }
250 
251 }