View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.sra.filters;
20  
21  import com.fasterxml.jackson.databind.json.JsonMapper;
22  import com.fasterxml.jackson.databind.node.ObjectNode;
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.function.Function;
28  import java.util.zip.GZIPInputStream;
29  import java.util.zip.GZIPOutputStream;
30  import org.apache.commons.lang3.ArrayUtils;
31  import org.apache.zookeeper.common.IOUtils;
32  import org.reactivestreams.Publisher;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.springframework.cloud.gateway.filter.GatewayFilter;
36  import org.springframework.cloud.gateway.filter.GatewayFilterChain;
37  import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
38  import org.springframework.core.Ordered;
39  import org.springframework.core.io.buffer.DataBuffer;
40  import org.springframework.core.io.buffer.DataBufferFactory;
41  import org.springframework.http.server.reactive.ServerHttpResponse;
42  import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
43  import org.springframework.web.server.ServerWebExchange;
44  import reactor.core.publisher.Flux;
45  import reactor.core.publisher.Mono;
46  
47  /**
48   * Inspired by {@link org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory}.
49   */
50  public class BodyPropertyAddingGatewayFilterFactory extends CustomGatewayFilterFactory {
51  
52      private static final Logger LOG = LoggerFactory.getLogger(BodyPropertyAddingGatewayFilterFactory.class);
53  
54      protected static final JsonMapper MAPPER = JsonMapper.builder().findAndAddModules().build();
55  
56      private static boolean isCompressed(final byte[] bytes) {
57          if ((bytes == null) || (bytes.length < 2)) {
58              return false;
59          } else {
60              return ((bytes[0] == (byte) (GZIPInputStream.GZIP_MAGIC))
61                      && (bytes[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)));
62          }
63      }
64  
65      @Override
66      public GatewayFilter apply(final Config config) {
67          return new ModifyResponseGatewayFilter(config);
68      }
69  
70      public static class ModifyResponseGatewayFilter implements GatewayFilter, Ordered {
71  
72          private final Config config;
73  
74          public ModifyResponseGatewayFilter(final Config config) {
75              this.config = config;
76          }
77  
78          @Override
79          public Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilterChain chain) {
80              return chain.filter(exchange.mutate().response(decorate(exchange)).build());
81          }
82  
83          private ServerHttpResponse decorate(final ServerWebExchange exchange) {
84              ServerHttpResponse originalResponse = exchange.getResponse();
85  
86              DataBufferFactory bufferFactory = originalResponse.bufferFactory();
87              return new ServerHttpResponseDecorator(originalResponse) {
88  
89                  @Override
90                  public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
91                      return super.writeWith(Flux.from(body).buffer().map(dataBuffers -> {
92                          ByteArrayOutputStream payload = new ByteArrayOutputStream();
93                          dataBuffers.forEach(buffer -> {
94                              byte[] array = new byte[buffer.readableByteCount()];
95                              buffer.read(array);
96                              try {
97                                  payload.write(array);
98                              } catch (IOException e) {
99                                  LOG.error("While reading original body content", e);
100                             }
101                         });
102 
103                         byte[] input = payload.toByteArray();
104 
105                         InputStream is = null;
106                         boolean compressed = false;
107                         byte[] output;
108                         try {
109                             if (isCompressed(input)) {
110                                 compressed = true;
111                                 is = new GZIPInputStream(new ByteArrayInputStream(input));
112                             } else {
113                                 is = new ByteArrayInputStream(input);
114                             }
115 
116                             ObjectNode content = (ObjectNode) MAPPER.readTree(is);
117                             String[] kv = config.getData().split("=");
118                             content.put(kv[0], kv[1]);
119 
120                             output = MAPPER.writeValueAsBytes(content);
121                         } catch (IOException e) {
122                             LOG.error("While (de)serializing as JSON", e);
123                             output = ArrayUtils.clone(input);
124                         } finally {
125                             IOUtils.closeStream(is);
126                         }
127 
128                         if (compressed) {
129                             try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length)) {
130                                 try (GZIPOutputStream gzipos = new GZIPOutputStream(baos)) {
131                                     gzipos.write(output);
132                                 }
133 
134                                 output = baos.toByteArray();
135                             } catch (IOException e) {
136                                 LOG.error("While GZIP-encoding output", e);
137                             }
138                         }
139 
140                         return bufferFactory.wrap(output);
141                     }));
142                 }
143 
144                 @Override
145                 public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
146                     return writeWith(Flux.from(body).flatMapSequential(Function.identity()));
147                 }
148             };
149         }
150 
151         @Override
152         public int getOrder() {
153             return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
154         }
155     }
156 }