1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.CharBuffer;
32 import java.nio.charset.Charset;
33 import java.nio.charset.CharsetEncoder;
34 import java.nio.charset.CoderResult;
35 import java.nio.charset.StandardCharsets;
36 import java.util.Set;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.http.ContentType;
41 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
42 import org.apache.hc.core5.http.nio.DataStreamChannel;
43 import org.apache.hc.core5.http.nio.StreamChannel;
44 import org.apache.hc.core5.util.Args;
45
46
47
48
49
50
51 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
52 public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
53
54 private static final CharBuffer EMPTY = CharBuffer.wrap(new char[0]);
55
56 enum State { ACTIVE, FLUSHING, END_STREAM }
57
58 private final ByteBuffer bytebuf;
59 private final int fragmentSizeHint;
60 private final ContentType contentType;
61 private final CharsetEncoder charsetEncoder;
62
63 private volatile State state;
64
65 public AbstractCharAsyncEntityProducer(
66 final int bufferSize,
67 final int fragmentSizeHint,
68 final ContentType contentType) {
69 Args.positive(bufferSize, "Buffer size");
70 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
71 this.bytebuf = ByteBuffer.allocate(bufferSize);
72 this.contentType = contentType;
73 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
74 this.charsetEncoder = charset.newEncoder();
75 this.state = State.ACTIVE;
76 }
77
78 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
79 if (bytebuf.position() > 0) {
80 bytebuf.flip();
81 channel.write(bytebuf);
82 bytebuf.compact();
83 }
84 }
85
86 final int writeData(final StreamChannel<ByteBuffer> channel, final CharBuffer src) throws IOException {
87
88 final int chunk = src.remaining();
89 if (chunk == 0) {
90 return 0;
91 }
92
93 final int p = src.position();
94 final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
95 if (result.isError()) {
96 result.throwException();
97 }
98
99 if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
100 flush(channel);
101 }
102
103 return src.position() - p;
104 }
105
106 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
107 if (state == State.ACTIVE) {
108 state = State.FLUSHING;
109 if (!bytebuf.hasRemaining()) {
110 flush(channel);
111 }
112
113 final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
114 if (result.isError()) {
115 result.throwException();
116 }
117 final CoderResult result2 = charsetEncoder.flush(bytebuf);
118 if (result2.isError()) {
119 result.throwException();
120 } else if (result.isUnderflow()) {
121 flush(channel);
122 if (bytebuf.position() == 0) {
123 state = State.END_STREAM;
124 channel.endStream();
125 }
126 }
127 }
128
129 }
130
131
132
133
134
135
136
137
138 protected abstract int availableData();
139
140
141
142
143
144
145
146
147
148
149 protected abstract void produceData(StreamChannel<CharBuffer> channel) throws IOException;
150
151 @Override
152 public final String getContentType() {
153 return contentType != null ? contentType.toString() : null;
154 }
155
156 @Override
157 public String getContentEncoding() {
158 return null;
159 }
160
161 @Override
162 public long getContentLength() {
163 return -1;
164 }
165
166 @Override
167 public boolean isChunked() {
168 return false;
169 }
170
171 @Override
172 public Set<String> getTrailerNames() {
173 return null;
174 }
175
176 @Override
177 public final int available() {
178 if (state == State.ACTIVE) {
179 return availableData();
180 } else {
181 synchronized (bytebuf) {
182 return bytebuf.position();
183 }
184 }
185 }
186
187 @Override
188 public final void produce(final DataStreamChannel channel) throws IOException {
189 synchronized (bytebuf) {
190 if (state == State.ACTIVE) {
191 produceData(new StreamChannel<CharBuffer>() {
192
193 @Override
194 public int write(final CharBuffer src) throws IOException {
195 Args.notNull(src, "Buffer");
196 synchronized (bytebuf) {
197 return writeData(channel, src);
198 }
199 }
200
201 @Override
202 public void endStream() throws IOException {
203 synchronized (bytebuf) {
204 streamEnd(channel);
205 }
206 }
207
208 });
209 }
210 if (state == State.FLUSHING) {
211 final CoderResult result = charsetEncoder.flush(bytebuf);
212 if (result.isError()) {
213 result.throwException();
214 } else if (result.isOverflow()) {
215 flush(channel);
216 } else if (result.isUnderflow()) {
217 flush(channel);
218 if (bytebuf.position() == 0) {
219 state = State.END_STREAM;
220 channel.endStream();
221 }
222 }
223
224 }
225
226 }
227 }
228
229 @Override
230 public void releaseResources() {
231 state = State.ACTIVE;
232 charsetEncoder.reset();
233 }
234
235 }