1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.examples;
28
29 import java.net.URI;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.StandardCharsets;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
36 import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpResponse;
40 import org.apache.hc.core5.http.Message;
41 import org.apache.hc.core5.http.config.Http1Config;
42 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
43 import org.apache.hc.core5.http2.HttpVersionPolicy;
44 import org.apache.hc.core5.http2.config.H2Config;
45 import org.apache.hc.core5.io.CloseMode;
46 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
47 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
48 import org.apache.hc.core5.reactor.IOReactorConfig;
49 import org.apache.hc.core5.util.Timeout;
50 import org.reactivestreams.Publisher;
51
52 import io.reactivex.Flowable;
53 import io.reactivex.Notification;
54 import io.reactivex.Observable;
55 import io.reactivex.functions.Consumer;
56 import io.reactivex.functions.Function;
57
58
59
60
61 public class ReactiveClientFullDuplexExchange {
62
63 public static void main(final String[] args) throws Exception {
64
65 final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
66 .setSoTimeout(Timeout.ofSeconds(5))
67 .build();
68
69 final MinimalHttpAsyncClient client = HttpAsyncClients.createMinimal(
70 HttpVersionPolicy.NEGOTIATE,
71 H2Config.DEFAULT,
72 Http1Config.DEFAULT,
73 ioReactorConfig);
74
75 client.start();
76
77 final URI requestUri = new URI("http://httpbin.org/post");
78 final byte[] bs = "stuff".getBytes(StandardCharsets.UTF_8);
79 final ReactiveEntityProducer reactiveEntityProducer = new ReactiveEntityProducer(
80 Flowable.just(ByteBuffer.wrap(bs)), bs.length, ContentType.TEXT_PLAIN, null);
81 final BasicRequestProducer requestProducer = new BasicRequestProducer(
82 "POST", requestUri, reactiveEntityProducer);
83
84 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
85 final Future<Void> requestFuture = client.execute(requestProducer, consumer, null);
86 final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
87
88 System.out.println(streamingResponse.getHead());
89 for (final Header header : streamingResponse.getHead().getHeaders()) {
90 System.out.println(header);
91 }
92 System.out.println();
93
94 Observable.fromPublisher(streamingResponse.getBody())
95 .map(new Function<ByteBuffer, String>() {
96 @Override
97 public String apply(final ByteBuffer byteBuffer) throws Exception {
98 final byte[] string = new byte[byteBuffer.remaining()];
99 byteBuffer.get(string);
100 return new String(string);
101 }
102 })
103 .materialize()
104 .forEach(new Consumer<Notification<String>>() {
105 @Override
106 public void accept(final Notification<String> byteBufferNotification) throws Exception {
107 System.out.println(byteBufferNotification);
108 }
109 });
110
111 requestFuture.get(1, TimeUnit.MINUTES);
112
113 System.out.println("Shutting down");
114 client.close(CloseMode.GRACEFUL);
115 }
116 }