1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.util;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.locks.Condition;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.http.annotation.ThreadingBehavior;
35 import org.apache.http.annotation.Contract;
36 import org.apache.http.nio.ContentDecoder;
37 import org.apache.http.nio.IOControl;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
56 public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
57
58 private final ReentrantLock lock;
59 private final Condition condition;
60
61 private volatile IOControl ioControl;
62 private volatile boolean shutdown = false;
63 private volatile boolean endOfStream = false;
64
65
66
67
68 @Deprecated
69 public SharedInputBuffer(final int bufferSize, final IOControl ioControl, final ByteBufferAllocator allocator) {
70 super(bufferSize, allocator);
71 this.ioControl = ioControl;
72 this.lock = new ReentrantLock();
73 this.condition = this.lock.newCondition();
74 }
75
76
77
78
79 public SharedInputBuffer(final int bufferSize, final ByteBufferAllocator allocator) {
80 super(bufferSize, allocator);
81 this.lock = new ReentrantLock();
82 this.condition = this.lock.newCondition();
83 }
84
85
86
87
88 public SharedInputBuffer(final int bufferSize) {
89 this(bufferSize, HeapByteBufferAllocator.INSTANCE);
90 }
91
92 @Override
93 public void reset() {
94 if (this.shutdown) {
95 return;
96 }
97 this.lock.lock();
98 try {
99 clear();
100 this.endOfStream = false;
101 } finally {
102 this.lock.unlock();
103 }
104 }
105
106
107
108
109 @Override
110 @Deprecated
111 public int consumeContent(final ContentDecoder decoder) throws IOException {
112 return consumeContent(decoder, null);
113 }
114
115
116
117
118 public int consumeContent(final ContentDecoder decoder, final IOControl ioControl) throws IOException {
119 if (this.shutdown) {
120 return -1;
121 }
122 this.lock.lock();
123 try {
124 if (ioControl != null) {
125 this.ioControl = ioControl;
126 }
127 setInputMode();
128 int totalRead = 0;
129 int bytesRead;
130 while ((bytesRead = decoder.read(this.buffer)) > 0) {
131 totalRead += bytesRead;
132 }
133 if (bytesRead == -1 || decoder.isCompleted()) {
134 this.endOfStream = true;
135 }
136 if (!this.buffer.hasRemaining()) {
137 if (this.ioControl != null) {
138 this.ioControl.suspendInput();
139 }
140 }
141 this.condition.signalAll();
142
143 if (totalRead > 0) {
144 return totalRead;
145 }
146 return this.endOfStream ? -1 : 0;
147 } finally {
148 this.lock.unlock();
149 }
150 }
151
152 @Override
153 public boolean hasData() {
154 this.lock.lock();
155 try {
156 return super.hasData();
157 } finally {
158 this.lock.unlock();
159 }
160 }
161
162 @Override
163 public int available() {
164 this.lock.lock();
165 try {
166 return super.available();
167 } finally {
168 this.lock.unlock();
169 }
170 }
171
172 @Override
173 public int capacity() {
174 this.lock.lock();
175 try {
176 return super.capacity();
177 } finally {
178 this.lock.unlock();
179 }
180 }
181
182 @Override
183 public int length() {
184 this.lock.lock();
185 try {
186 return super.length();
187 } finally {
188 this.lock.unlock();
189 }
190 }
191
192 protected void waitForData() throws IOException {
193 this.lock.lock();
194 try {
195 try {
196 while (!super.hasData() && !this.endOfStream) {
197 if (this.shutdown) {
198 throw new InterruptedIOException("Input operation aborted");
199 }
200 if (this.ioControl != null) {
201 this.ioControl.requestInput();
202 }
203 this.condition.await();
204 }
205 } catch (final InterruptedException ex) {
206 throw new IOException("Interrupted while waiting for more data");
207 }
208 } finally {
209 this.lock.unlock();
210 }
211 }
212
213 public void close() {
214 if (this.shutdown) {
215 return;
216 }
217 this.endOfStream = true;
218 this.lock.lock();
219 try {
220 this.condition.signalAll();
221 } finally {
222 this.lock.unlock();
223 }
224 }
225
226 public void shutdown() {
227 if (this.shutdown) {
228 return;
229 }
230 this.shutdown = true;
231 this.lock.lock();
232 try {
233 this.condition.signalAll();
234 } finally {
235 this.lock.unlock();
236 }
237 }
238
239 protected boolean isShutdown() {
240 return this.shutdown;
241 }
242
243 protected boolean isEndOfStream() {
244 return this.shutdown || (!hasData() && this.endOfStream);
245 }
246
247 @Override
248 public int read() throws IOException {
249 if (this.shutdown) {
250 return -1;
251 }
252 this.lock.lock();
253 try {
254 if (!hasData()) {
255 waitForData();
256 }
257 if (isEndOfStream()) {
258 return -1;
259 }
260 return this.buffer.get() & 0xff;
261 } finally {
262 this.lock.unlock();
263 }
264 }
265
266 @Override
267 public int read(final byte[] b, final int off, final int len) throws IOException {
268 if (this.shutdown) {
269 return -1;
270 }
271 if (b == null) {
272 return 0;
273 }
274 this.lock.lock();
275 try {
276 if (!hasData()) {
277 waitForData();
278 }
279 if (isEndOfStream()) {
280 return -1;
281 }
282 setOutputMode();
283 int chunk = len;
284 if (chunk > this.buffer.remaining()) {
285 chunk = this.buffer.remaining();
286 }
287 this.buffer.get(b, off, chunk);
288 return chunk;
289 } finally {
290 this.lock.unlock();
291 }
292 }
293
294 public int read(final byte[] b) throws IOException {
295 if (this.shutdown) {
296 return -1;
297 }
298 if (b == null) {
299 return 0;
300 }
301 return read(b, 0, b.length);
302 }
303
304 }