1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive.examples;
28
29 import static java.nio.charset.StandardCharsets.UTF_8;
30
31 import java.net.URI;
32 import java.nio.ByteBuffer;
33 import java.util.Random;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpConnection;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.Message;
43 import org.apache.hc.core5.http.impl.Http1StreamListener;
44 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
45 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
46 import org.apache.hc.core5.http.message.RequestLine;
47 import org.apache.hc.core5.http.message.StatusLine;
48 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
49 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
52 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
53 import org.apache.hc.core5.reactor.IOReactorConfig;
54 import org.apache.hc.core5.util.Timeout;
55 import org.reactivestreams.Publisher;
56
57 import io.reactivex.rxjava3.core.Flowable;
58 import io.reactivex.rxjava3.core.Observable;
59
60
61
62
63
64
65 public class ReactiveFullDuplexClientExample {
66
67 public static void main(final String[] args) throws Exception {
68 String endpoint = "http://localhost:8080/echo";
69 if (args.length >= 1) {
70 endpoint = args[0];
71 }
72
73
74 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
75 .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build())
76 .setStreamListener(new Http1StreamListener() {
77 @Override
78 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
79 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
80
81 }
82
83 @Override
84 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
85 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
86 }
87
88 @Override
89 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
90 if (keepAlive) {
91 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
92 } else {
93 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
94 }
95 }
96
97 })
98 .create();
99
100 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
101 System.out.println("HTTP requester shutting down");
102 requester.close(CloseMode.GRACEFUL);
103 }));
104 requester.start();
105
106 final Random random = new Random();
107 final Flowable<ByteBuffer> publisher = Flowable.range(1, 100)
108 .map(ignored -> {
109 final String str = random.nextDouble() + "\n";
110 return ByteBuffer.wrap(str.getBytes(UTF_8));
111 });
112 final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(new URI(endpoint))
113 .setEntity(new ReactiveEntityProducer(publisher, -1, ContentType.TEXT_PLAIN, null))
114 .build();
115
116 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
117 final Future<Void> responseComplete = requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
118 final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
119
120 System.out.println(streamingResponse.getHead());
121 for (final Header header : streamingResponse.getHead().getHeaders()) {
122 System.out.println(header);
123 }
124 System.out.println();
125
126 Observable.fromPublisher(streamingResponse.getBody())
127 .map(byteBuffer -> {
128 final byte[] string = new byte[byteBuffer.remaining()];
129 byteBuffer.get(string);
130 return new String(string);
131 })
132 .materialize()
133 .forEach(System.out::println);
134
135 responseComplete.get(1, TimeUnit.MINUTES);
136 System.out.println("Shutting down I/O reactor");
137 requester.initiateShutdown();
138 }
139
140 }