1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.sra.filters;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.util.zip.GZIPInputStream;
25 import java.util.zip.GZIPOutputStream;
26 import org.reactivestreams.Publisher;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.springframework.cloud.gateway.filter.GatewayFilter;
30 import org.springframework.cloud.gateway.filter.GatewayFilterChain;
31 import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
32 import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
33 import org.springframework.core.Ordered;
34 import org.springframework.core.io.buffer.DataBuffer;
35 import org.springframework.core.io.buffer.DataBufferUtils;
36 import org.springframework.core.io.buffer.PooledDataBuffer;
37 import org.springframework.http.server.reactive.ServerHttpResponse;
38 import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
39 import org.springframework.web.server.ServerWebExchange;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42
43
44
45
46 public abstract class ModifyResponseGatewayFilterFactory extends CustomGatewayFilterFactory {
47
48 protected static final Logger LOG = LoggerFactory.getLogger(ModifyResponseGatewayFilterFactory.class);
49
50 @Override
51 public GatewayFilter apply(final Config config) {
52 return new InternalModifyResponseGatewayFilter(config);
53 }
54
55 protected abstract byte[] modifyResponse(
56 InputStream responseBody,
57 Config config,
58 ServerHttpResponseDecorator decorator,
59 ServerWebExchange exchange)
60 throws IOException;
61
62 protected boolean skipCond(final ServerHttpResponseDecorator decorator) {
63 LOG.debug("Decorator: {}", decorator);
64 return false;
65 }
66
67 protected class InternalModifyResponseGatewayFilter implements GatewayFilter, Ordered {
68
69 private final Config config;
70
71 public InternalModifyResponseGatewayFilter(final Config config) {
72 this.config = config;
73 }
74
75 @Override
76 public Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilterChain chain) {
77 return chain.filter(exchange.mutate().response(decorate(exchange)).build());
78 }
79
80 private ServerHttpResponse decorate(final ServerWebExchange exchange) {
81 return new ServerHttpResponseDecorator(exchange.getResponse()) {
82
83 @Override
84 public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
85 return skipCond(this)
86 ? super.writeWith(body)
87 : super.writeWith(Flux.from(body).
88 collectList().
89 filter(list -> !list.isEmpty()).
90 map(list -> list.get(0).factory().join(list)).
91 doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release).
92 map(dataBuffer -> {
93 if (dataBuffer.readableByteCount() > 0) {
94 LOG.trace("Retaining body in exchange attribute");
95 exchange.getAttributes().put(
96 ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, dataBuffer);
97 }
98
99 boolean inputCompressed = false;
100 if (dataBuffer.readableByteCount() >= 2) {
101 byte[] first2 = new byte[2];
102 dataBuffer.read(first2, 0, 2);
103 dataBuffer.readPosition(0);
104
105 inputCompressed = ((first2[0] == (byte) (GZIPInputStream.GZIP_MAGIC))
106 && (first2[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)));
107 }
108
109 boolean outputCompressed = false;
110 byte[] output;
111 try (InputStream is = inputCompressed
112 ? new GZIPInputStream(dataBuffer.asInputStream())
113 : dataBuffer.asInputStream()) {
114
115 outputCompressed = is instanceof GZIPInputStream;
116
117 output = modifyResponse(is, config, this, exchange);
118 } catch (IOException e) {
119 LOG.error("While modifying response", e);
120
121 output = new byte[dataBuffer.readableByteCount()];
122 dataBuffer.read(output);
123 }
124
125 if (outputCompressed) {
126 try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length);
127 GZIPOutputStream gzipos = new GZIPOutputStream(baos)) {
128
129 gzipos.write(output);
130 gzipos.finish();
131 output = baos.toByteArray();
132 } catch (IOException e) {
133 LOG.error("While GZIP-encoding output", e);
134 }
135 }
136
137 return exchange.getResponse().bufferFactory().wrap(output);
138 }));
139 }
140
141 @Override
142 public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
143 return writeWith(Flux.from(body).flatMapSequential(p -> p));
144 }
145 };
146 }
147
148 @Override
149 public int getOrder() {
150 return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
151 }
152 }
153 }