1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.util;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.locks.Condition;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.http.annotation.ThreadingBehavior;
35 import org.apache.http.annotation.Contract;
36 import org.apache.http.nio.ContentEncoder;
37 import org.apache.http.nio.IOControl;
38 import org.apache.http.util.Args;
39 import org.apache.http.util.Asserts;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
58 public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
59
60 private final ReentrantLock lock;
61 private final Condition condition;
62
63 private volatile IOControl ioControl;
64 private volatile boolean shutdown = false;
65 private volatile boolean endOfStream = false;
66
67
68
69
70 @Deprecated
71 public SharedOutputBuffer(final int bufferSize, final IOControl ioControl, final ByteBufferAllocator allocator) {
72 super(bufferSize, allocator);
73 Args.notNull(ioControl, "I/O content control");
74 this.ioControl = ioControl;
75 this.lock = new ReentrantLock();
76 this.condition = this.lock.newCondition();
77 }
78
79
80
81
82 public SharedOutputBuffer(final int bufferSize, final ByteBufferAllocator allocator) {
83 super(bufferSize, allocator);
84 this.lock = new ReentrantLock();
85 this.condition = this.lock.newCondition();
86 }
87
88
89
90
91 public SharedOutputBuffer(final int bufferSize) {
92 this(bufferSize, HeapByteBufferAllocator.INSTANCE);
93 }
94
95 @Override
96 public void reset() {
97 if (this.shutdown) {
98 return;
99 }
100 this.lock.lock();
101 try {
102 clear();
103 this.endOfStream = false;
104 } finally {
105 this.lock.unlock();
106 }
107 }
108
109 @Override
110 public boolean hasData() {
111 this.lock.lock();
112 try {
113 return super.hasData();
114 } finally {
115 this.lock.unlock();
116 }
117 }
118
119 @Override
120 public int available() {
121 this.lock.lock();
122 try {
123 return super.available();
124 } finally {
125 this.lock.unlock();
126 }
127 }
128
129 @Override
130 public int capacity() {
131 this.lock.lock();
132 try {
133 return super.capacity();
134 } finally {
135 this.lock.unlock();
136 }
137 }
138
139 @Override
140 public int length() {
141 this.lock.lock();
142 try {
143 return super.length();
144 } finally {
145 this.lock.unlock();
146 }
147 }
148
149
150
151
152 @Override
153 @Deprecated
154 public int produceContent(final ContentEncoder encoder) throws IOException {
155 return produceContent(encoder, null);
156 }
157
158
159
160
161 public int produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException {
162 if (this.shutdown) {
163 return -1;
164 }
165 this.lock.lock();
166 try {
167 if (ioControl != null) {
168 this.ioControl = ioControl;
169 }
170 setOutputMode();
171 int bytesWritten = 0;
172 if (super.hasData()) {
173 bytesWritten = encoder.write(this.buffer);
174 if (encoder.isCompleted()) {
175 this.endOfStream = true;
176 }
177 }
178 if (!super.hasData()) {
179
180
181 if (this.endOfStream && !encoder.isCompleted()) {
182 encoder.complete();
183 }
184 if (!this.endOfStream) {
185
186 if (this.ioControl != null) {
187 this.ioControl.suspendOutput();
188 }
189 }
190 }
191 this.condition.signalAll();
192 return bytesWritten;
193 } finally {
194 this.lock.unlock();
195 }
196 }
197
198 public void close() {
199 shutdown();
200 }
201
202 public void shutdown() {
203 if (this.shutdown) {
204 return;
205 }
206 this.shutdown = true;
207 this.lock.lock();
208 try {
209 this.condition.signalAll();
210 } finally {
211 this.lock.unlock();
212 }
213 }
214
215 @Override
216 public void write(final byte[] b, final int off, final int len) throws IOException {
217 if (b == null) {
218 return;
219 }
220 int pos = off;
221 this.lock.lock();
222 try {
223 Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing");
224 setInputMode();
225 int remaining = len;
226 while (remaining > 0) {
227 if (!this.buffer.hasRemaining()) {
228 flushContent();
229 setInputMode();
230 }
231 final int chunk = Math.min(remaining, this.buffer.remaining());
232 this.buffer.put(b, pos, chunk);
233 remaining -= chunk;
234 pos += chunk;
235 }
236 } finally {
237 this.lock.unlock();
238 }
239 }
240
241 public void write(final byte[] b) throws IOException {
242 if (b == null) {
243 return;
244 }
245 write(b, 0, b.length);
246 }
247
248 @Override
249 public void write(final int b) throws IOException {
250 this.lock.lock();
251 try {
252 Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing");
253 setInputMode();
254 if (!this.buffer.hasRemaining()) {
255 flushContent();
256 setInputMode();
257 }
258 this.buffer.put((byte)b);
259 } finally {
260 this.lock.unlock();
261 }
262 }
263
264 @Override
265 public void flush() throws IOException {
266
267 }
268
269 private void flushContent() throws IOException {
270 this.lock.lock();
271 try {
272 try {
273 while (super.hasData()) {
274 if (this.shutdown) {
275 throw new InterruptedIOException("Output operation aborted");
276 }
277 if (this.ioControl != null) {
278 this.ioControl.requestOutput();
279 }
280 this.condition.await();
281 }
282 } catch (final InterruptedException ex) {
283 throw new IOException("Interrupted while flushing the content buffer");
284 }
285 } finally {
286 this.lock.unlock();
287 }
288 }
289
290 @Override
291 public void writeCompleted() throws IOException {
292 this.lock.lock();
293 try {
294 if (this.endOfStream) {
295 return;
296 }
297 this.endOfStream = true;
298 if (this.ioControl != null) {
299 this.ioControl.requestOutput();
300 }
301 } finally {
302 this.lock.unlock();
303 }
304 }
305
306 }