1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.CharBuffer;
32 import java.nio.charset.Charset;
33 import java.nio.charset.CharsetEncoder;
34 import java.nio.charset.CoderResult;
35 import java.nio.charset.StandardCharsets;
36 import java.util.Objects;
37 import java.util.Set;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.hc.core5.annotation.Contract;
41 import org.apache.hc.core5.annotation.ThreadingBehavior;
42 import org.apache.hc.core5.http.ContentType;
43 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
44 import org.apache.hc.core5.http.nio.DataStreamChannel;
45 import org.apache.hc.core5.http.nio.StreamChannel;
46 import org.apache.hc.core5.util.Args;
47
48
49
50
51
52
53 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
54 public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
55
56 private static final CharBuffer EMPTY = CharBuffer.wrap(new char[0]);
57
58 enum State { ACTIVE, FLUSHING, END_STREAM }
59
60 private final ByteBuffer bytebuf;
61 private final int fragmentSizeHint;
62 private final ContentType contentType;
63 private final CharsetEncoder charsetEncoder;
64 private final ReentrantLock lock;
65
66 private volatile State state;
67
68 public AbstractCharAsyncEntityProducer(
69 final int bufferSize,
70 final int fragmentSizeHint,
71 final ContentType contentType) {
72 Args.positive(bufferSize, "Buffer size");
73 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
74 this.bytebuf = ByteBuffer.allocate(bufferSize);
75 this.contentType = contentType;
76 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8);
77 this.charsetEncoder = charset.newEncoder();
78 this.state = State.ACTIVE;
79 this.lock = new ReentrantLock();
80 }
81
82 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
83 if (bytebuf.position() > 0) {
84 bytebuf.flip();
85 channel.write(bytebuf);
86 bytebuf.compact();
87 }
88 }
89
90 final int writeData(final StreamChannel<ByteBuffer> channel, final CharBuffer src) throws IOException {
91
92 final int chunk = src.remaining();
93 if (chunk == 0) {
94 return 0;
95 }
96
97 final int p = src.position();
98 final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
99 if (result.isError()) {
100 result.throwException();
101 }
102
103 if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
104 flush(channel);
105 }
106
107 return src.position() - p;
108 }
109
110 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
111 if (state == State.ACTIVE) {
112 state = State.FLUSHING;
113 if (!bytebuf.hasRemaining()) {
114 flush(channel);
115 }
116
117 final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
118 if (result.isError()) {
119 result.throwException();
120 }
121 final CoderResult result2 = charsetEncoder.flush(bytebuf);
122 if (result2.isError()) {
123 result.throwException();
124 } else if (result.isUnderflow()) {
125 flush(channel);
126 if (bytebuf.position() == 0) {
127 state = State.END_STREAM;
128 channel.endStream();
129 }
130 }
131 }
132
133 }
134
135
136
137
138
139
140
141
142 protected abstract int availableData();
143
144
145
146
147
148
149
150
151
152
153 protected abstract void produceData(StreamChannel<CharBuffer> channel) throws IOException;
154
155 @Override
156 public final String getContentType() {
157 return Objects.toString(contentType, null);
158 }
159
160 @Override
161 public String getContentEncoding() {
162 return null;
163 }
164
165 @Override
166 public long getContentLength() {
167 return -1;
168 }
169
170 @Override
171 public boolean isChunked() {
172 return false;
173 }
174
175 @Override
176 public Set<String> getTrailerNames() {
177 return null;
178 }
179
180 @Override
181 public final int available() {
182 if (state == State.ACTIVE) {
183 return availableData();
184 } else {
185 lock.lock();
186 try {
187 return bytebuf.position();
188 } finally {
189 lock.unlock();
190 }
191 }
192 }
193
194 @Override
195 public final void produce(final DataStreamChannel channel) throws IOException {
196 lock.lock();
197 try {
198 if (state == State.ACTIVE) {
199 produceData(new StreamChannel<CharBuffer>() {
200
201 @Override
202 public int write(final CharBuffer src) throws IOException {
203 Args.notNull(src, "Buffer");
204 lock.lock();
205 try {
206 return writeData(channel, src);
207 } finally {
208 lock.unlock();
209 }
210 }
211
212 @Override
213 public void endStream() throws IOException {
214 lock.lock();
215 try {
216 streamEnd(channel);
217 } finally {
218 lock.unlock();
219 }
220 }
221
222 });
223 }
224 if (state == State.FLUSHING) {
225 final CoderResult result = charsetEncoder.flush(bytebuf);
226 if (result.isError()) {
227 result.throwException();
228 } else if (result.isOverflow()) {
229 flush(channel);
230 } else if (result.isUnderflow()) {
231 flush(channel);
232 if (bytebuf.position() == 0) {
233 state = State.END_STREAM;
234 channel.endStream();
235 }
236 }
237
238 }
239
240 } finally {
241 lock.unlock();
242 }
243 }
244
245 @Override
246 public void releaseResources() {
247 state = State.ACTIVE;
248 charsetEncoder.reset();
249 }
250
251 }