1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.support.classic;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.Charset;
34 import java.nio.charset.StandardCharsets;
35 import java.util.List;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.locks.Condition;
41 import java.util.concurrent.locks.ReentrantLock;
42
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.WritableByteChannelMock;
45 import org.apache.hc.core5.http.nio.DataStreamChannel;
46 import org.apache.hc.core5.util.Timeout;
47 import org.junit.jupiter.api.Assertions;
48 import org.junit.jupiter.api.Test;
49 import org.mockito.Mockito;
50
51 public class TestSharedOutputBuffer {
52
53 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
54
55 static class DataStreamChannelMock implements DataStreamChannel {
56
57 private final WritableByteChannelMock channel;
58
59 private final ReentrantLock lock;
60 private final Condition condition;
61
62 DataStreamChannelMock(final WritableByteChannelMock channel) {
63 this.channel = channel;
64 this.lock = new ReentrantLock();
65 this.condition = lock.newCondition();
66 }
67
68 @Override
69 public int write(final ByteBuffer src) throws IOException {
70 lock.lock();
71 try {
72 return channel.write(src);
73 } finally {
74 lock.unlock();
75 }
76 }
77
78 @Override
79 public void requestOutput() {
80 lock.lock();
81 try {
82 condition.signalAll();
83 } finally {
84 lock.unlock();
85 }
86 }
87
88 @Override
89 public void endStream(final List<? extends Header> trailers) throws IOException {
90 lock.lock();
91 try {
92 channel.close();
93 condition.signalAll();
94 } finally {
95 lock.unlock();
96 }
97 }
98
99 @Override
100 public void endStream() throws IOException {
101 endStream(null);
102 }
103
104 public void awaitOutputRequest() throws InterruptedException {
105 lock.lock();
106 try {
107 condition.await();
108 } finally {
109 lock.unlock();
110 }
111 }
112
113 }
114
115 @Test
116 public void testBasis() throws Exception {
117
118 final Charset charset = StandardCharsets.US_ASCII;
119 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
120
121 final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
122 final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
123 outputBuffer.flush(dataStreamChannel);
124
125 Mockito.verifyNoInteractions(dataStreamChannel);
126
127 Assertions.assertEquals(0, outputBuffer.length());
128 Assertions.assertEquals(30, outputBuffer.capacity());
129
130 final byte[] tmp = "1234567890".getBytes(charset);
131 outputBuffer.write(tmp, 0, tmp.length);
132 outputBuffer.write(tmp, 0, tmp.length);
133 outputBuffer.write('1');
134 outputBuffer.write('2');
135
136 Assertions.assertEquals(22, outputBuffer.length());
137 Assertions.assertEquals(8, outputBuffer.capacity());
138
139 Mockito.verifyNoInteractions(dataStreamChannel);
140 }
141
142 @Test
143 public void testFlush() throws Exception {
144
145 final Charset charset = StandardCharsets.US_ASCII;
146 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
147
148 final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
149 final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
150 outputBuffer.flush(dataStreamChannel);
151
152 Assertions.assertEquals(0, outputBuffer.length());
153 Assertions.assertEquals(30, outputBuffer.capacity());
154
155 final byte[] tmp = "1234567890".getBytes(charset);
156 outputBuffer.write(tmp, 0, tmp.length);
157 outputBuffer.write(tmp, 0, tmp.length);
158 outputBuffer.write('1');
159 outputBuffer.write('2');
160
161 outputBuffer.flush(dataStreamChannel);
162
163 Assertions.assertEquals(0, outputBuffer.length());
164 Assertions.assertEquals(30, outputBuffer.capacity());
165 }
166
167 @Test
168 public void testMultithreadingWriteStream() throws Exception {
169
170 final Charset charset = StandardCharsets.US_ASCII;
171 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
172
173 final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
174 final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
175
176 final ExecutorService executorService = Executors.newFixedThreadPool(2);
177 final Future<Boolean> task1 = executorService.submit(() -> {
178 final byte[] tmp = "1234567890".getBytes(charset);
179 outputBuffer.write(tmp, 0, tmp.length);
180 outputBuffer.write(tmp, 0, tmp.length);
181 outputBuffer.write('1');
182 outputBuffer.write('2');
183 outputBuffer.write(tmp, 0, tmp.length);
184 outputBuffer.write(tmp, 0, tmp.length);
185 outputBuffer.write(tmp, 0, tmp.length);
186 outputBuffer.writeCompleted();
187 outputBuffer.writeCompleted();
188 return Boolean.TRUE;
189 });
190 final Future<Boolean> task2 = executorService.submit(() -> {
191 for (;;) {
192 outputBuffer.flush(dataStreamChannel);
193 if (outputBuffer.isEndStream()) {
194 break;
195 }
196 if (!outputBuffer.hasData()) {
197 dataStreamChannel.awaitOutputRequest();
198 }
199 }
200 return Boolean.TRUE;
201 });
202
203 Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
204 Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
205
206 Assertions.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
207 }
208
209 @Test
210 public void testMultithreadingWriteStreamAbort() throws Exception {
211
212 final Charset charset = StandardCharsets.US_ASCII;
213 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
214
215 final ExecutorService executorService = Executors.newFixedThreadPool(2);
216 final Future<Boolean> task1 = executorService.submit(() -> {
217 final byte[] tmp = "1234567890".getBytes(charset);
218 for (int i = 0; i < 20; i++) {
219 outputBuffer.write(tmp, 0, tmp.length);
220 }
221 outputBuffer.writeCompleted();
222 return Boolean.TRUE;
223 });
224 final Future<Boolean> task2 = executorService.submit(() -> {
225 Thread.sleep(200);
226 outputBuffer.abort();
227 return Boolean.TRUE;
228 });
229
230 Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
231 try {
232 task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
233 } catch (final ExecutionException ex) {
234 Assertions.assertTrue(ex.getCause() instanceof InterruptedIOException);
235 }
236 }
237
238 @Test
239 public void testEndStreamOnlyCalledOnce() throws IOException {
240
241 final DataStreamChannel channel = Mockito.mock(DataStreamChannel.class);
242 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
243
244 outputBuffer.flush(channel);
245
246 outputBuffer.writeCompleted();
247 outputBuffer.flush(channel);
248
249 Mockito.verify(channel, Mockito.times(1)).endStream();
250 }
251
252 }
253