View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.nio.support.classic;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.ByteBuffer;
33  import java.nio.charset.Charset;
34  import java.nio.charset.StandardCharsets;
35  import java.util.List;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.locks.Condition;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.WritableByteChannelMock;
45  import org.apache.hc.core5.http.nio.DataStreamChannel;
46  import org.apache.hc.core5.util.Timeout;
47  import org.junit.jupiter.api.Assertions;
48  import org.junit.jupiter.api.Test;
49  import org.mockito.Mockito;
50  
51  public class TestSharedOutputBuffer {
52  
53      private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
54  
55      static class DataStreamChannelMock implements DataStreamChannel {
56  
57          private final WritableByteChannelMock channel;
58  
59          private final ReentrantLock lock;
60          private final Condition condition;
61  
62          DataStreamChannelMock(final WritableByteChannelMock channel) {
63              this.channel = channel;
64              this.lock = new ReentrantLock();
65              this.condition = lock.newCondition();
66          }
67  
68          @Override
69          public int write(final ByteBuffer src) throws IOException {
70              lock.lock();
71              try {
72                  return channel.write(src);
73              } finally {
74                  lock.unlock();
75              }
76          }
77  
78          @Override
79          public void requestOutput() {
80              lock.lock();
81              try {
82                  condition.signalAll();
83              } finally {
84                  lock.unlock();
85              }
86          }
87  
88          @Override
89          public void endStream(final List<? extends Header> trailers) throws IOException {
90              lock.lock();
91              try {
92                  channel.close();
93                  condition.signalAll();
94              } finally {
95                  lock.unlock();
96              }
97          }
98  
99          @Override
100         public void endStream() throws IOException {
101             endStream(null);
102         }
103 
104         public void awaitOutputRequest() throws InterruptedException {
105             lock.lock();
106             try {
107                 condition.await();
108             } finally {
109                 lock.unlock();
110             }
111         }
112 
113     }
114 
115     @Test
116     public void testBasis() throws Exception {
117 
118         final Charset charset = StandardCharsets.US_ASCII;
119         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
120 
121         final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
122         final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
123         outputBuffer.flush(dataStreamChannel);
124 
125         Mockito.verifyNoInteractions(dataStreamChannel);
126 
127         Assertions.assertEquals(0, outputBuffer.length());
128         Assertions.assertEquals(30, outputBuffer.capacity());
129 
130         final byte[] tmp = "1234567890".getBytes(charset);
131         outputBuffer.write(tmp, 0, tmp.length);
132         outputBuffer.write(tmp, 0, tmp.length);
133         outputBuffer.write('1');
134         outputBuffer.write('2');
135 
136         Assertions.assertEquals(22, outputBuffer.length());
137         Assertions.assertEquals(8, outputBuffer.capacity());
138 
139         Mockito.verifyNoInteractions(dataStreamChannel);
140     }
141 
142     @Test
143     public void testFlush() throws Exception {
144 
145         final Charset charset = StandardCharsets.US_ASCII;
146         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
147 
148         final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
149         final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
150         outputBuffer.flush(dataStreamChannel);
151 
152         Assertions.assertEquals(0, outputBuffer.length());
153         Assertions.assertEquals(30, outputBuffer.capacity());
154 
155         final byte[] tmp = "1234567890".getBytes(charset);
156         outputBuffer.write(tmp, 0, tmp.length);
157         outputBuffer.write(tmp, 0, tmp.length);
158         outputBuffer.write('1');
159         outputBuffer.write('2');
160 
161         outputBuffer.flush(dataStreamChannel);
162 
163         Assertions.assertEquals(0, outputBuffer.length());
164         Assertions.assertEquals(30, outputBuffer.capacity());
165     }
166 
167     @Test
168     public void testMultithreadingWriteStream() throws Exception {
169 
170         final Charset charset = StandardCharsets.US_ASCII;
171         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
172 
173         final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
174         final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
175 
176         final ExecutorService executorService = Executors.newFixedThreadPool(2);
177         final Future<Boolean> task1 = executorService.submit(() -> {
178             final byte[] tmp = "1234567890".getBytes(charset);
179             outputBuffer.write(tmp, 0, tmp.length);
180             outputBuffer.write(tmp, 0, tmp.length);
181             outputBuffer.write('1');
182             outputBuffer.write('2');
183             outputBuffer.write(tmp, 0, tmp.length);
184             outputBuffer.write(tmp, 0, tmp.length);
185             outputBuffer.write(tmp, 0, tmp.length);
186             outputBuffer.writeCompleted();
187             outputBuffer.writeCompleted();
188             return Boolean.TRUE;
189         });
190         final Future<Boolean> task2 = executorService.submit(() -> {
191             for (;;) {
192                 outputBuffer.flush(dataStreamChannel);
193                 if (outputBuffer.isEndStream()) {
194                     break;
195                 }
196                 if (!outputBuffer.hasData()) {
197                     dataStreamChannel.awaitOutputRequest();
198                 }
199             }
200             return Boolean.TRUE;
201         });
202 
203         Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
204         Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
205 
206         Assertions.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
207     }
208 
209     @Test
210     public void testMultithreadingWriteStreamAbort() throws Exception {
211 
212         final Charset charset = StandardCharsets.US_ASCII;
213         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
214 
215         final ExecutorService executorService = Executors.newFixedThreadPool(2);
216         final Future<Boolean> task1 = executorService.submit(() -> {
217             final byte[] tmp = "1234567890".getBytes(charset);
218             for (int i = 0; i < 20; i++) {
219                 outputBuffer.write(tmp, 0, tmp.length);
220             }
221             outputBuffer.writeCompleted();
222             return Boolean.TRUE;
223         });
224         final Future<Boolean> task2 = executorService.submit(() -> {
225             Thread.sleep(200);
226             outputBuffer.abort();
227             return Boolean.TRUE;
228         });
229 
230         Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
231         try {
232             task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
233         } catch (final ExecutionException ex) {
234             Assertions.assertTrue(ex.getCause() instanceof InterruptedIOException);
235         }
236     }
237 
238     @Test
239     public void testEndStreamOnlyCalledOnce() throws IOException {
240 
241         final DataStreamChannel channel = Mockito.mock(DataStreamChannel.class);
242         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
243 
244         outputBuffer.flush(channel);
245 
246         outputBuffer.writeCompleted();
247         outputBuffer.flush(channel);
248 
249         Mockito.verify(channel, Mockito.times(1)).endStream();
250     }
251 
252 }
253