1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.nio.ByteBuffer;
30 import java.nio.charset.StandardCharsets;
31
32 import org.apache.hc.core5.http.ContentType;
33 import org.apache.hc.core5.http.HttpStreamResetException;
34 import org.apache.hc.core5.http.nio.DataStreamChannel;
35 import org.junit.jupiter.api.Assertions;
36 import org.junit.jupiter.api.Test;
37
38 import io.reactivex.rxjava3.core.Flowable;
39
40 public class TestReactiveEntityProducer {
41
42 private static final long CONTENT_LENGTH = 1;
43 private static final ContentType CONTENT_TYPE = ContentType.APPLICATION_JSON;
44 private static final String GZIP_CONTENT_ENCODING = "gzip";
45
46 @Test
47 public void testStreamThatEndsNormally() throws Exception {
48 final Flowable<ByteBuffer> publisher = Flowable.just(
49 ByteBuffer.wrap(new byte[]{'1', '2', '3'}),
50 ByteBuffer.wrap(new byte[]{'4', '5', '6'}));
51 final ReactiveEntityProducer entityProducer = new ReactiveEntityProducer(publisher, CONTENT_LENGTH, CONTENT_TYPE, GZIP_CONTENT_ENCODING);
52
53 final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
54 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
55
56 entityProducer.produce(streamChannel);
57
58 Assertions.assertTrue(byteChannel.isOpen(), "Should be open");
59 Assertions.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
60
61 entityProducer.produce(streamChannel);
62
63 Assertions.assertFalse(byteChannel.isOpen(), "Should be closed");
64 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
65 Assertions.assertFalse(entityProducer.isChunked());
66 Assertions.assertEquals(GZIP_CONTENT_ENCODING, entityProducer.getContentEncoding());
67 Assertions.assertNull(entityProducer.getTrailerNames());
68 Assertions.assertEquals(CONTENT_LENGTH, entityProducer.getContentLength());
69 Assertions.assertEquals(CONTENT_TYPE.toString(), entityProducer.getContentType());
70 Assertions.assertFalse(entityProducer.isRepeatable());
71 Assertions.assertEquals(1, entityProducer.available());
72
73 entityProducer.releaseResources();
74 }
75
76 @Test
77
78 public void testStreamThatEndsWithError() throws Exception {
79 final Flowable<ByteBuffer> publisher = Flowable.concatArray(
80 Flowable.just(
81 ByteBuffer.wrap(new byte[]{'1'}),
82 ByteBuffer.wrap(new byte[]{'2'}),
83 ByteBuffer.wrap(new byte[]{'3'}),
84 ByteBuffer.wrap(new byte[]{'4'}),
85 ByteBuffer.wrap(new byte[]{'5'}),
86 ByteBuffer.wrap(new byte[]{'6'})),
87 Flowable.error(new RuntimeException())
88 );
89 final ReactiveEntityProducer entityProducer = new ReactiveEntityProducer(publisher, CONTENT_LENGTH, CONTENT_TYPE, GZIP_CONTENT_ENCODING);
90
91 final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
92 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
93
94 entityProducer.produce(streamChannel);
95 Assertions.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
96
97 final HttpStreamResetException exception = Assertions.assertThrows(HttpStreamResetException.class, () ->
98 entityProducer.produce(streamChannel));
99 Assertions.assertTrue(exception.getCause() instanceof RuntimeException, "Expected published exception to be rethrown");
100 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
101 entityProducer.failed(exception);
102 Assertions.assertEquals(1, entityProducer.available());
103
104 Assertions.assertTrue(byteChannel.isOpen());
105 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
106 Assertions.assertFalse(entityProducer.isChunked());
107 Assertions.assertEquals(GZIP_CONTENT_ENCODING, entityProducer.getContentEncoding());
108 Assertions.assertNull(entityProducer.getTrailerNames());
109 Assertions.assertEquals(CONTENT_LENGTH, entityProducer.getContentLength());
110 Assertions.assertEquals(CONTENT_TYPE.toString(), entityProducer.getContentType());
111 Assertions.assertFalse(entityProducer.isRepeatable());
112 Assertions.assertEquals(1, entityProducer.available());
113
114 entityProducer.releaseResources();
115 }
116
117 }