View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.sra.filters;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.util.zip.GZIPInputStream;
25  import java.util.zip.GZIPOutputStream;
26  import org.reactivestreams.Publisher;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  import org.springframework.cloud.gateway.filter.GatewayFilter;
30  import org.springframework.cloud.gateway.filter.GatewayFilterChain;
31  import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
32  import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
33  import org.springframework.core.Ordered;
34  import org.springframework.core.io.buffer.DataBuffer;
35  import org.springframework.core.io.buffer.DataBufferUtils;
36  import org.springframework.core.io.buffer.PooledDataBuffer;
37  import org.springframework.http.server.reactive.ServerHttpResponse;
38  import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
39  import org.springframework.web.server.ServerWebExchange;
40  import reactor.core.publisher.Flux;
41  import reactor.core.publisher.Mono;
42  
43  /**
44   * Inspired by {@link org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory}.
45   */
46  public abstract class ModifyResponseGatewayFilterFactory extends CustomGatewayFilterFactory {
47  
48      protected static final Logger LOG = LoggerFactory.getLogger(ModifyResponseGatewayFilterFactory.class);
49  
50      @Override
51      public GatewayFilter apply(final Config config) {
52          return new InternalModifyResponseGatewayFilter(config);
53      }
54  
55      protected abstract byte[] modifyResponse(
56              InputStream responseBody,
57              Config config,
58              ServerHttpResponseDecorator decorator,
59              ServerWebExchange exchange)
60              throws IOException;
61  
62      protected boolean skipCond(final ServerHttpResponseDecorator decorator) {
63          LOG.debug("Decorator: {}", decorator);
64          return false;
65      }
66  
67      protected class InternalModifyResponseGatewayFilter implements GatewayFilter, Ordered {
68  
69          private final Config config;
70  
71          public InternalModifyResponseGatewayFilter(final Config config) {
72              this.config = config;
73          }
74  
75          @Override
76          public Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilterChain chain) {
77              return chain.filter(exchange.mutate().response(decorate(exchange)).build());
78          }
79  
80          private ServerHttpResponse decorate(final ServerWebExchange exchange) {
81              return new ServerHttpResponseDecorator(exchange.getResponse()) {
82  
83                  @Override
84                  public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
85                      return skipCond(this)
86                              ? super.writeWith(body)
87                              : super.writeWith(Flux.from(body).
88                                      collectList().
89                                      filter(list -> !list.isEmpty()).
90                                      map(list -> list.get(0).factory().join(list)).
91                                      doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release).
92                                      map(dataBuffer -> {
93                                          if (dataBuffer.readableByteCount() > 0) {
94                                              LOG.trace("Retaining body in exchange attribute");
95                                              exchange.getAttributes().put(
96                                                      ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, dataBuffer);
97                                          }
98  
99                                          boolean inputCompressed = false;
100                                         if (dataBuffer.readableByteCount() >= 2) {
101                                             byte[] first2 = new byte[2];
102                                             dataBuffer.read(first2, 0, 2);
103                                             dataBuffer.readPosition(0);
104 
105                                             inputCompressed = ((first2[0] == (byte) (GZIPInputStream.GZIP_MAGIC))
106                                                     && (first2[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)));
107                                         }
108 
109                                         boolean outputCompressed = false;
110                                         byte[] output;
111                                         try (InputStream is = inputCompressed
112                                                 ? new GZIPInputStream(dataBuffer.asInputStream())
113                                                 : dataBuffer.asInputStream()) {
114 
115                                             outputCompressed = is instanceof GZIPInputStream;
116 
117                                             output = modifyResponse(is, config, this, exchange);
118                                         } catch (IOException e) {
119                                             LOG.error("While modifying response", e);
120 
121                                             output = new byte[dataBuffer.readableByteCount()];
122                                             dataBuffer.read(output);
123                                         }
124 
125                                         if (outputCompressed) {
126                                             try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length);
127                                                     GZIPOutputStream gzipos = new GZIPOutputStream(baos)) {
128 
129                                                 gzipos.write(output);
130                                                 gzipos.finish();
131                                                 output = baos.toByteArray();
132                                             } catch (IOException e) {
133                                                 LOG.error("While GZIP-encoding output", e);
134                                             }
135                                         }
136 
137                                         return exchange.getResponse().bufferFactory().wrap(output);
138                                     }));
139                 }
140 
141                 @Override
142                 public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
143                     return writeWith(Flux.from(body).flatMapSequential(p -> p));
144                 }
145             };
146         }
147 
148         @Override
149         public int getOrder() {
150             return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
151         }
152     }
153 }