1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.support.classic;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.Charset;
34 import java.nio.charset.StandardCharsets;
35 import java.util.List;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40
41 import org.apache.hc.core5.http.Header;
42 import org.apache.hc.core5.http.WritableByteChannelMock;
43 import org.apache.hc.core5.http.nio.DataStreamChannel;
44 import org.apache.hc.core5.util.Timeout;
45 import org.junit.jupiter.api.Assertions;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.Mockito;
48
49 public class TestSharedOutputBuffer {
50
51 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
52
53 static class DataStreamChannelMock implements DataStreamChannel {
54
55 private final WritableByteChannelMock channel;
56
57 DataStreamChannelMock(final WritableByteChannelMock channel) {
58 this.channel = channel;
59 }
60
61 @Override
62 public synchronized int write(final ByteBuffer src) throws IOException {
63 return channel.write(src);
64 }
65
66 @Override
67 public synchronized void requestOutput() {
68 notifyAll();
69 }
70
71 @Override
72 public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
73 channel.close();
74 notifyAll();
75 }
76
77 @Override
78 public void endStream() throws IOException {
79 endStream(null);
80 }
81
82 public synchronized void awaitOutputRequest() throws InterruptedException {
83 wait();
84 }
85
86 }
87
88 @Test
89 public void testBasis() throws Exception {
90
91 final Charset charset = StandardCharsets.US_ASCII;
92 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
93
94 final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
95 final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
96 outputBuffer.flush(dataStreamChannel);
97
98 Mockito.verifyNoInteractions(dataStreamChannel);
99
100 Assertions.assertEquals(0, outputBuffer.length());
101 Assertions.assertEquals(30, outputBuffer.capacity());
102
103 final byte[] tmp = "1234567890".getBytes(charset);
104 outputBuffer.write(tmp, 0, tmp.length);
105 outputBuffer.write(tmp, 0, tmp.length);
106 outputBuffer.write('1');
107 outputBuffer.write('2');
108
109 Assertions.assertEquals(22, outputBuffer.length());
110 Assertions.assertEquals(8, outputBuffer.capacity());
111
112 Mockito.verifyNoInteractions(dataStreamChannel);
113 }
114
115 @Test
116 public void testFlush() throws Exception {
117
118 final Charset charset = StandardCharsets.US_ASCII;
119 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
120
121 final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
122 final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
123 outputBuffer.flush(dataStreamChannel);
124
125 Assertions.assertEquals(0, outputBuffer.length());
126 Assertions.assertEquals(30, outputBuffer.capacity());
127
128 final byte[] tmp = "1234567890".getBytes(charset);
129 outputBuffer.write(tmp, 0, tmp.length);
130 outputBuffer.write(tmp, 0, tmp.length);
131 outputBuffer.write('1');
132 outputBuffer.write('2');
133
134 outputBuffer.flush(dataStreamChannel);
135
136 Assertions.assertEquals(0, outputBuffer.length());
137 Assertions.assertEquals(30, outputBuffer.capacity());
138 }
139
140 @Test
141 public void testMultithreadingWriteStream() throws Exception {
142
143 final Charset charset = StandardCharsets.US_ASCII;
144 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
145
146 final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
147 final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
148
149 final ExecutorService executorService = Executors.newFixedThreadPool(2);
150 final Future<Boolean> task1 = executorService.submit(() -> {
151 final byte[] tmp = "1234567890".getBytes(charset);
152 outputBuffer.write(tmp, 0, tmp.length);
153 outputBuffer.write(tmp, 0, tmp.length);
154 outputBuffer.write('1');
155 outputBuffer.write('2');
156 outputBuffer.write(tmp, 0, tmp.length);
157 outputBuffer.write(tmp, 0, tmp.length);
158 outputBuffer.write(tmp, 0, tmp.length);
159 outputBuffer.writeCompleted();
160 outputBuffer.writeCompleted();
161 return Boolean.TRUE;
162 });
163 final Future<Boolean> task2 = executorService.submit(() -> {
164 for (;;) {
165 outputBuffer.flush(dataStreamChannel);
166 if (outputBuffer.isEndStream()) {
167 break;
168 }
169 if (!outputBuffer.hasData()) {
170 dataStreamChannel.awaitOutputRequest();
171 }
172 }
173 return Boolean.TRUE;
174 });
175
176 Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
177 Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
178
179 Assertions.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
180 }
181
182 @Test
183 public void testMultithreadingWriteStreamAbort() throws Exception {
184
185 final Charset charset = StandardCharsets.US_ASCII;
186 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
187
188 final ExecutorService executorService = Executors.newFixedThreadPool(2);
189 final Future<Boolean> task1 = executorService.submit(() -> {
190 final byte[] tmp = "1234567890".getBytes(charset);
191 for (int i = 0; i < 20; i++) {
192 outputBuffer.write(tmp, 0, tmp.length);
193 }
194 outputBuffer.writeCompleted();
195 return Boolean.TRUE;
196 });
197 final Future<Boolean> task2 = executorService.submit(() -> {
198 Thread.sleep(200);
199 outputBuffer.abort();
200 return Boolean.TRUE;
201 });
202
203 Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
204 try {
205 task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
206 } catch (final ExecutionException ex) {
207 Assertions.assertTrue(ex.getCause() instanceof InterruptedIOException);
208 }
209 }
210
211 @Test
212 public void testEndStreamOnlyCalledOnce() throws IOException {
213
214 final DataStreamChannel channel = Mockito.mock(DataStreamChannel.class);
215 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
216
217 outputBuffer.flush(channel);
218
219 outputBuffer.writeCompleted();
220 outputBuffer.flush(channel);
221
222 Mockito.verify(channel, Mockito.times(1)).endStream();
223 }
224
225 }
226