1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.Collections;
32 import java.util.Set;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.apache.hc.core5.annotation.Contract;
36 import org.apache.hc.core5.annotation.ThreadingBehavior;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
39 import org.apache.hc.core5.http.nio.DataStreamChannel;
40 import org.apache.hc.core5.http.nio.StreamChannel;
41 import org.apache.hc.core5.util.Args;
42
43
44
45
46
47
48 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
49 public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProducer {
50
51 enum State { ACTIVE, FLUSHING, END_STREAM }
52
53 private final int fragmentSizeHint;
54 private final ByteBuffer byteBuffer;
55 private final ContentType contentType;
56
57 private final ReentrantLock lock;
58
59 private volatile State state;
60
61 public AbstractBinAsyncEntityProducer(final int fragmentSizeHint, final ContentType contentType) {
62 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
63 this.byteBuffer = ByteBuffer.allocate(this.fragmentSizeHint);
64 this.contentType = contentType;
65 this.state = State.ACTIVE;
66 this.lock = new ReentrantLock();
67 }
68
69 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
70 if (byteBuffer.position() > 0) {
71 byteBuffer.flip();
72 channel.write(byteBuffer);
73 byteBuffer.compact();
74 }
75 }
76
77 final int writeData(final StreamChannel<ByteBuffer> channel, final ByteBuffer src) throws IOException {
78 final int chunk = src.remaining();
79 if (chunk == 0) {
80 return 0;
81 }
82 if (chunk > fragmentSizeHint) {
83
84
85
86
87 flush(channel);
88 if (byteBuffer.position() == 0) {
89 return channel.write(src);
90 }
91 } else {
92
93
94
95
96 if (byteBuffer.remaining() < chunk) {
97 flush(channel);
98 }
99 if (byteBuffer.remaining() >= chunk) {
100 byteBuffer.put(src);
101 if (!byteBuffer.hasRemaining()) {
102 flush(channel);
103 }
104 return chunk;
105 }
106 }
107 return 0;
108 }
109
110 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
111 if (state == State.ACTIVE) {
112 state = State.FLUSHING;
113 flush(channel);
114 if (byteBuffer.position() == 0) {
115 state = State.END_STREAM;
116 channel.endStream();
117 }
118 }
119 }
120
121
122
123
124
125
126
127
128 protected abstract int availableData();
129
130
131
132
133
134
135
136
137
138
139 protected abstract void produceData(StreamChannel<ByteBuffer> channel) throws IOException;
140
141 @Override
142 public final String getContentType() {
143 return contentType != null ? contentType.toString() : null;
144 }
145
146 @Override
147 public String getContentEncoding() {
148 return null;
149 }
150
151 @Override
152 public boolean isChunked() {
153 return false;
154 }
155
156 @Override
157 public Set<String> getTrailerNames() {
158 return Collections.emptySet();
159 }
160
161 @Override
162 public long getContentLength() {
163 return -1;
164 }
165
166 @Override
167 public final int available() {
168 if (state == State.ACTIVE) {
169 return availableData();
170 } else {
171 lock.lock();
172 try {
173 return byteBuffer.position();
174 } finally {
175 lock.unlock();
176 }
177 }
178 }
179
180 @Override
181 public final void produce(final DataStreamChannel channel) throws IOException {
182 lock.lock();
183 try {
184 if (state == State.ACTIVE) {
185 produceData(new StreamChannel<ByteBuffer>() {
186
187 @Override
188 public int write(final ByteBuffer src) throws IOException {
189 Args.notNull(src, "Buffer");
190 lock.lock();
191 try {
192 return writeData(channel, src);
193 } finally {
194 lock.unlock();
195 }
196 }
197
198 @Override
199 public void endStream() throws IOException {
200 lock.lock();
201 try {
202 streamEnd(channel);
203 } finally {
204 lock.unlock();
205 }
206 }
207
208 });
209 }
210 if (state == State.FLUSHING) {
211 flush(channel);
212 if (byteBuffer.position() == 0) {
213 state = State.END_STREAM;
214 channel.endStream();
215 }
216 }
217 } finally {
218 lock.unlock();
219 }
220 }
221
222 @Override
223 public void releaseResources() {
224 state = State.ACTIVE;
225 }
226
227 }