View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.persistence.jpa.dao;
20  
21  import co.elastic.clients.elasticsearch.ElasticsearchClient;
22  import co.elastic.clients.elasticsearch._types.FieldSort;
23  import co.elastic.clients.elasticsearch._types.SearchType;
24  import co.elastic.clients.elasticsearch._types.SortOptions;
25  import co.elastic.clients.elasticsearch._types.SortOrder;
26  import co.elastic.clients.elasticsearch._types.query_dsl.Query;
27  import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
28  import co.elastic.clients.elasticsearch._types.query_dsl.TextQueryType;
29  import co.elastic.clients.elasticsearch.core.CountRequest;
30  import co.elastic.clients.elasticsearch.core.SearchRequest;
31  import co.elastic.clients.elasticsearch.core.search.Hit;
32  import co.elastic.clients.json.JsonData;
33  import com.fasterxml.jackson.databind.node.ObjectNode;
34  import java.io.IOException;
35  import java.time.OffsetDateTime;
36  import java.util.ArrayList;
37  import java.util.List;
38  import java.util.Objects;
39  import java.util.stream.Collectors;
40  import org.apache.commons.lang3.StringUtils;
41  import org.apache.syncope.common.lib.audit.AuditEntry;
42  import org.apache.syncope.common.lib.types.AuditElements;
43  import org.apache.syncope.core.persistence.api.dao.search.OrderByClause;
44  import org.apache.syncope.core.provisioning.api.serialization.POJOHelper;
45  import org.apache.syncope.core.spring.security.AuthContextUtils;
46  import org.apache.syncope.ext.elasticsearch.client.ElasticsearchUtils;
47  import org.springframework.util.CollectionUtils;
48  
49  public class ElasticsearchAuditConfDAO extends JPAAuditConfDAO {
50  
51      protected final ElasticsearchClient client;
52  
53      protected final int indexMaxResultWindow;
54  
55      public ElasticsearchAuditConfDAO(final ElasticsearchClient client, final int indexMaxResultWindow) {
56          this.client = client;
57          this.indexMaxResultWindow = indexMaxResultWindow;
58      }
59  
60      protected Query getQuery(
61              final String entityKey,
62              final AuditElements.EventCategoryType type,
63              final String category,
64              final String subcategory,
65              final List<String> events,
66              final AuditElements.Result result,
67              final OffsetDateTime before,
68              final OffsetDateTime after) {
69  
70          List<Query> queries = new ArrayList<>();
71  
72          if (entityKey != null) {
73              queries.add(new Query.Builder().
74                      multiMatch(QueryBuilders.multiMatch().
75                              fields("message.before", "message.inputs", "message.output", "message.throwable").
76                              type(TextQueryType.Phrase).
77                              query(entityKey).build()).build());
78          }
79  
80          if (type != null) {
81              queries.add(new Query.Builder().
82                      term(QueryBuilders.term().field("message.logger.type").value(type.name()).build()).
83                      build());
84          }
85  
86          if (StringUtils.isNotBlank(category)) {
87              queries.add(new Query.Builder().
88                      term(QueryBuilders.term().field("message.logger.category").value(category).build()).
89                      build());
90          }
91  
92          if (StringUtils.isNotBlank(subcategory)) {
93              queries.add(new Query.Builder().
94                      term(QueryBuilders.term().field("message.logger.subcategory").value(subcategory).build()).
95                      build());
96          }
97  
98          List<Query> eventQueries = events.stream().map(event -> new Query.Builder().
99                  term(QueryBuilders.term().field("message.logger.event").value(event).build()).
100                 build()).
101                 collect(Collectors.toList());
102         if (!eventQueries.isEmpty()) {
103             queries.add(new Query.Builder().disMax(QueryBuilders.disMax().queries(eventQueries).build()).build());
104         }
105 
106         if (result != null) {
107             queries.add(new Query.Builder().
108                     term(QueryBuilders.term().field("message.logger.result").value(result.name()).build()).
109                     build());
110         }
111 
112         if (before != null) {
113             queries.add(new Query.Builder().
114                     range(QueryBuilders.range().
115                             field("instant").lte(JsonData.of(before.toInstant().toEpochMilli())).build()).
116                     build());
117         }
118 
119         if (after != null) {
120             queries.add(new Query.Builder().
121                     range(QueryBuilders.range().
122                             field("instant").gte(JsonData.of(after.toInstant().toEpochMilli())).build()).
123                     build());
124         }
125 
126         return new Query.Builder().bool(QueryBuilders.bool().must(queries).build()).build();
127     }
128 
129     @Override
130     public int countEntries(
131             final String entityKey,
132             final AuditElements.EventCategoryType type,
133             final String category,
134             final String subcategory,
135             final List<String> events,
136             final AuditElements.Result result,
137             final OffsetDateTime before,
138             final OffsetDateTime after) {
139 
140         CountRequest request = new CountRequest.Builder().
141                 index(ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain())).
142                 query(getQuery(entityKey, type, category, subcategory, events, result, before, after)).
143                 build();
144         try {
145             return (int) client.count(request).count();
146         } catch (IOException e) {
147             LOG.error("Search error", e);
148             return 0;
149         }
150     }
151 
152     protected List<SortOptions> sortBuilders(final List<OrderByClause> orderBy) {
153         return orderBy.stream().map(clause -> {
154             String sortField = clause.getField();
155             if ("EVENT_DATE".equalsIgnoreCase(sortField)) {
156                 sortField = "message.date";
157             }
158 
159             return new SortOptions.Builder().field(
160                     new FieldSort.Builder().
161                             field(sortField).
162                             order(clause.getDirection() == OrderByClause.Direction.ASC
163                                     ? SortOrder.Asc : SortOrder.Desc).
164                             build()).
165                     build();
166         }).collect(Collectors.toList());
167     }
168 
169     @Override
170     public List<AuditEntry> searchEntries(
171             final String entityKey,
172             final int page,
173             final int itemsPerPage,
174             final AuditElements.EventCategoryType type,
175             final String category,
176             final String subcategory,
177             final List<String> events,
178             final AuditElements.Result result,
179             final OffsetDateTime before,
180             final OffsetDateTime after,
181             final List<OrderByClause> orderBy) {
182 
183         SearchRequest request = new SearchRequest.Builder().
184                 index(ElasticsearchUtils.getAuditIndex(AuthContextUtils.getDomain())).
185                 searchType(SearchType.QueryThenFetch).
186                 query(getQuery(entityKey, type, category, subcategory, events, result, before, after)).
187                 fields(f -> f.field("message")).
188                 from(itemsPerPage * (page <= 0 ? 0 : page - 1)).
189                 size(itemsPerPage < 0 ? indexMaxResultWindow : itemsPerPage).
190                 sort(sortBuilders(orderBy)).
191                 build();
192 
193         List<Hit<ObjectNode>> esResult = null;
194         try {
195             esResult = client.search(request, ObjectNode.class).hits().hits();
196         } catch (Exception e) {
197             LOG.error("While searching in Elasticsearch", e);
198         }
199 
200         return CollectionUtils.isEmpty(esResult)
201                 ? List.of()
202                 : esResult.stream().
203                         map(hit -> POJOHelper.convertValue(hit.source().get("message"), AuditEntry.class)).
204                         filter(Objects::nonNull).collect(Collectors.toList());
205     }
206 }