1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.Collections;
32 import java.util.Objects;
33 import java.util.Set;
34 import java.util.concurrent.locks.ReentrantLock;
35
36 import org.apache.hc.core5.annotation.Contract;
37 import org.apache.hc.core5.annotation.ThreadingBehavior;
38 import org.apache.hc.core5.http.ContentType;
39 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
40 import org.apache.hc.core5.http.nio.DataStreamChannel;
41 import org.apache.hc.core5.http.nio.StreamChannel;
42 import org.apache.hc.core5.util.Args;
43
44
45
46
47
48
49 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
50 public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProducer {
51
52 enum State { ACTIVE, FLUSHING, END_STREAM }
53
54 private final int fragmentSizeHint;
55 private final ByteBuffer byteBuffer;
56 private final ContentType contentType;
57
58 private final ReentrantLock lock;
59
60 private volatile State state;
61
62 public AbstractBinAsyncEntityProducer(final int fragmentSizeHint, final ContentType contentType) {
63 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
64 this.byteBuffer = ByteBuffer.allocate(this.fragmentSizeHint);
65 this.contentType = contentType;
66 this.state = State.ACTIVE;
67 this.lock = new ReentrantLock();
68 }
69
70 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
71 if (byteBuffer.position() > 0) {
72 byteBuffer.flip();
73 channel.write(byteBuffer);
74 byteBuffer.compact();
75 }
76 }
77
78 final int writeData(final StreamChannel<ByteBuffer> channel, final ByteBuffer src) throws IOException {
79 final int chunk = src.remaining();
80 if (chunk == 0) {
81 return 0;
82 }
83 if (chunk > fragmentSizeHint) {
84
85
86
87
88 flush(channel);
89 if (byteBuffer.position() == 0) {
90 return channel.write(src);
91 }
92 } else {
93
94
95
96
97 if (byteBuffer.remaining() < chunk) {
98 flush(channel);
99 }
100 if (byteBuffer.remaining() >= chunk) {
101 byteBuffer.put(src);
102 if (!byteBuffer.hasRemaining()) {
103 flush(channel);
104 }
105 return chunk;
106 }
107 }
108 return 0;
109 }
110
111 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
112 if (state == State.ACTIVE) {
113 state = State.FLUSHING;
114 flush(channel);
115 if (byteBuffer.position() == 0) {
116 state = State.END_STREAM;
117 channel.endStream();
118 }
119 }
120 }
121
122
123
124
125
126
127
128
129 protected abstract int availableData();
130
131
132
133
134
135
136
137
138
139
140 protected abstract void produceData(StreamChannel<ByteBuffer> channel) throws IOException;
141
142 @Override
143 public final String getContentType() {
144 return Objects.toString(contentType, null);
145 }
146
147 @Override
148 public String getContentEncoding() {
149 return null;
150 }
151
152 @Override
153 public boolean isChunked() {
154 return false;
155 }
156
157 @Override
158 public Set<String> getTrailerNames() {
159 return Collections.emptySet();
160 }
161
162 @Override
163 public long getContentLength() {
164 return -1;
165 }
166
167 @Override
168 public final int available() {
169 if (state == State.ACTIVE) {
170 return availableData();
171 } else {
172 lock.lock();
173 try {
174 return byteBuffer.position();
175 } finally {
176 lock.unlock();
177 }
178 }
179 }
180
181 @Override
182 public final void produce(final DataStreamChannel channel) throws IOException {
183 lock.lock();
184 try {
185 if (state == State.ACTIVE) {
186 produceData(new StreamChannel<ByteBuffer>() {
187
188 @Override
189 public int write(final ByteBuffer src) throws IOException {
190 Args.notNull(src, "Buffer");
191 lock.lock();
192 try {
193 return writeData(channel, src);
194 } finally {
195 lock.unlock();
196 }
197 }
198
199 @Override
200 public void endStream() throws IOException {
201 lock.lock();
202 try {
203 streamEnd(channel);
204 } finally {
205 lock.unlock();
206 }
207 }
208
209 });
210 }
211 if (state == State.FLUSHING) {
212 flush(channel);
213 if (byteBuffer.position() == 0) {
214 state = State.END_STREAM;
215 channel.endStream();
216 }
217 }
218 } finally {
219 lock.unlock();
220 }
221 }
222
223 @Override
224 public void releaseResources() {
225 state = State.ACTIVE;
226 }
227
228 }