1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.CharBuffer;
32 import java.nio.charset.Charset;
33 import java.nio.charset.CharsetEncoder;
34 import java.nio.charset.CoderResult;
35 import java.nio.charset.StandardCharsets;
36 import java.util.Set;
37 import java.util.concurrent.locks.ReentrantLock;
38
39 import org.apache.hc.core5.annotation.Contract;
40 import org.apache.hc.core5.annotation.ThreadingBehavior;
41 import org.apache.hc.core5.http.ContentType;
42 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
43 import org.apache.hc.core5.http.nio.DataStreamChannel;
44 import org.apache.hc.core5.http.nio.StreamChannel;
45 import org.apache.hc.core5.util.Args;
46
47
48
49
50
51
52 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
53 public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
54
55 private static final CharBuffer EMPTY = CharBuffer.wrap(new char[0]);
56
57 enum State { ACTIVE, FLUSHING, END_STREAM }
58
59 private final ByteBuffer bytebuf;
60 private final int fragmentSizeHint;
61 private final ContentType contentType;
62 private final CharsetEncoder charsetEncoder;
63 private final ReentrantLock lock;
64
65 private volatile State state;
66
67 public AbstractCharAsyncEntityProducer(
68 final int bufferSize,
69 final int fragmentSizeHint,
70 final ContentType contentType) {
71 Args.positive(bufferSize, "Buffer size");
72 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
73 this.bytebuf = ByteBuffer.allocate(bufferSize);
74 this.contentType = contentType;
75 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8);
76 this.charsetEncoder = charset.newEncoder();
77 this.state = State.ACTIVE;
78 this.lock = new ReentrantLock();
79 }
80
81 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
82 if (bytebuf.position() > 0) {
83 bytebuf.flip();
84 channel.write(bytebuf);
85 bytebuf.compact();
86 }
87 }
88
89 final int writeData(final StreamChannel<ByteBuffer> channel, final CharBuffer src) throws IOException {
90
91 final int chunk = src.remaining();
92 if (chunk == 0) {
93 return 0;
94 }
95
96 final int p = src.position();
97 final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
98 if (result.isError()) {
99 result.throwException();
100 }
101
102 if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
103 flush(channel);
104 }
105
106 return src.position() - p;
107 }
108
109 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
110 if (state == State.ACTIVE) {
111 state = State.FLUSHING;
112 if (!bytebuf.hasRemaining()) {
113 flush(channel);
114 }
115
116 final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
117 if (result.isError()) {
118 result.throwException();
119 }
120 final CoderResult result2 = charsetEncoder.flush(bytebuf);
121 if (result2.isError()) {
122 result.throwException();
123 } else if (result.isUnderflow()) {
124 flush(channel);
125 if (bytebuf.position() == 0) {
126 state = State.END_STREAM;
127 channel.endStream();
128 }
129 }
130 }
131
132 }
133
134
135
136
137
138
139
140
141 protected abstract int availableData();
142
143
144
145
146
147
148
149
150
151
152 protected abstract void produceData(StreamChannel<CharBuffer> channel) throws IOException;
153
154 @Override
155 public final String getContentType() {
156 return contentType != null ? contentType.toString() : null;
157 }
158
159 @Override
160 public String getContentEncoding() {
161 return null;
162 }
163
164 @Override
165 public long getContentLength() {
166 return -1;
167 }
168
169 @Override
170 public boolean isChunked() {
171 return false;
172 }
173
174 @Override
175 public Set<String> getTrailerNames() {
176 return null;
177 }
178
179 @Override
180 public final int available() {
181 if (state == State.ACTIVE) {
182 return availableData();
183 } else {
184 lock.lock();
185 try {
186 return bytebuf.position();
187 } finally {
188 lock.unlock();
189 }
190 }
191 }
192
193 @Override
194 public final void produce(final DataStreamChannel channel) throws IOException {
195 lock.lock();
196 try {
197 if (state == State.ACTIVE) {
198 produceData(new StreamChannel<CharBuffer>() {
199
200 @Override
201 public int write(final CharBuffer src) throws IOException {
202 Args.notNull(src, "Buffer");
203 lock.lock();
204 try {
205 return writeData(channel, src);
206 } finally {
207 lock.unlock();
208 }
209 }
210
211 @Override
212 public void endStream() throws IOException {
213 lock.lock();
214 try {
215 streamEnd(channel);
216 } finally {
217 lock.unlock();
218 }
219 }
220
221 });
222 }
223 if (state == State.FLUSHING) {
224 final CoderResult result = charsetEncoder.flush(bytebuf);
225 if (result.isError()) {
226 result.throwException();
227 } else if (result.isOverflow()) {
228 flush(channel);
229 } else if (result.isUnderflow()) {
230 flush(channel);
231 if (bytebuf.position() == 0) {
232 state = State.END_STREAM;
233 channel.endStream();
234 }
235 }
236
237 }
238
239 } finally {
240 lock.unlock();
241 }
242 }
243
244 @Override
245 public void releaseResources() {
246 state = State.ACTIVE;
247 charsetEncoder.reset();
248 }
249
250 }