View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.job;
20  
21  import co.elastic.clients.elasticsearch.ElasticsearchClient;
22  import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
23  import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
24  import co.elastic.clients.elasticsearch._types.ErrorCause;
25  import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
26  import co.elastic.clients.elasticsearch.core.BulkRequest;
27  import co.elastic.clients.elasticsearch.core.BulkResponse;
28  import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
29  import co.elastic.clients.elasticsearch.indices.IndexSettings;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Objects;
33  import java.util.stream.Collectors;
34  import org.apache.syncope.common.lib.types.AnyTypeKind;
35  import org.apache.syncope.core.persistence.api.dao.AnyDAO;
36  import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
37  import org.apache.syncope.core.persistence.api.dao.GroupDAO;
38  import org.apache.syncope.core.persistence.api.dao.RealmDAO;
39  import org.apache.syncope.core.persistence.api.dao.UserDAO;
40  import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
41  import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
42  import org.apache.syncope.core.spring.security.AuthContextUtils;
43  import org.apache.syncope.ext.elasticsearch.client.ElasticsearchIndexManager;
44  import org.apache.syncope.ext.elasticsearch.client.ElasticsearchUtils;
45  import org.quartz.JobExecutionContext;
46  import org.quartz.JobExecutionException;
47  import org.springframework.beans.factory.annotation.Autowired;
48  
49  /**
50   * Remove and rebuild all Elasticsearch indexes with information from existing users, groups and any objects.
51   */
52  public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask> {
53  
54      protected static class ErrorLoggingBulkListener implements BulkListener<Void> {
55  
56          protected static final ErrorLoggingBulkListener INSTANCE = new ErrorLoggingBulkListener();
57  
58          @Override
59          public void beforeBulk(
60                  final long executionId,
61                  final BulkRequest request,
62                  final List<Void> contexts) {
63  
64              // do nothing
65          }
66  
67          @Override
68          public void afterBulk(
69                  final long executionId,
70                  final BulkRequest request,
71                  final List<Void> contexts,
72                  final BulkResponse response) {
73  
74              if (response.errors()) {
75                  String details = response.items().stream().map(BulkResponseItem::error).
76                          filter(Objects::nonNull).map(ErrorCause::toString).collect(Collectors.joining(", "));
77                  LOG.error("Errors found for request {}; details: {}", executionId, details);
78              }
79          }
80  
81          @Override
82          public void afterBulk(
83                  final long executionId,
84                  final BulkRequest request,
85                  final List<Void> contexts,
86                  final Throwable failure) {
87  
88              LOG.error("Bulk request {} failed", executionId, failure);
89          }
90      }
91  
92      @Autowired
93      protected ElasticsearchClient client;
94  
95      @Autowired
96      protected ElasticsearchIndexManager indexManager;
97  
98      @Autowired
99      protected ElasticsearchUtils utils;
100 
101     @Autowired
102     protected UserDAO userDAO;
103 
104     @Autowired
105     protected GroupDAO groupDAO;
106 
107     @Autowired
108     protected AnyObjectDAO anyObjectDAO;
109 
110     @Autowired
111     protected RealmDAO realmDAO;
112 
113     protected IndexSettings userSettings() throws IOException {
114         return indexManager.defaultSettings();
115     }
116 
117     protected IndexSettings groupSettings() throws IOException {
118         return indexManager.defaultSettings();
119     }
120 
121     protected IndexSettings anyObjectSettings() throws IOException {
122         return indexManager.defaultSettings();
123     }
124 
125     protected IndexSettings realmSettings() throws IOException {
126         return indexManager.defaultSettings();
127     }
128 
129     protected IndexSettings auditSettings() throws IOException {
130         return indexManager.defaultSettings();
131     }
132 
133     protected TypeMapping userMapping() throws IOException {
134         return indexManager.defaultAnyMapping();
135     }
136 
137     protected TypeMapping groupMapping() throws IOException {
138         return indexManager.defaultAnyMapping();
139     }
140 
141     protected TypeMapping anyObjectMapping() throws IOException {
142         return indexManager.defaultAnyMapping();
143     }
144 
145     protected TypeMapping realmMapping() throws IOException {
146         return indexManager.defaultRealmMapping();
147     }
148 
149     protected TypeMapping auditMapping() throws IOException {
150         return indexManager.defaultAuditMapping();
151     }
152 
153     @Override
154     protected String doExecute(final boolean dryRun, final String executor, final JobExecutionContext context)
155             throws JobExecutionException {
156 
157         if (!dryRun) {
158             setStatus("Start rebuilding indexes");
159 
160             try {
161                 indexManager.createAnyIndex(
162                         AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping());
163 
164                 indexManager.createAnyIndex(
165                         AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping());
166 
167                 indexManager.createAnyIndex(
168                         AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping());
169 
170                 int users = userDAO.count();
171                 String uindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
172                 setStatus("Indexing " + users + " users under " + uindex + "...");
173 
174                 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
175                         maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
176 
177                     for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
178                         for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
179                             ingester.add(op -> op.index(idx -> idx.
180                                     index(uindex).
181                                     id(user).
182                                     document(utils.document(userDAO.find(user)))));
183                         }
184                     }
185                 } catch (Exception e) {
186                     LOG.error("Errors while ingesting index {}", uindex, e);
187                 }
188 
189                 int groups = groupDAO.count();
190                 String gindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
191                 setStatus("Indexing " + groups + " groups under " + gindex + "...");
192 
193                 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
194                         maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
195 
196                     for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
197                         for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
198                             ingester.add(op -> op.index(idx -> idx.
199                                     index(gindex).
200                                     id(group).
201                                     document(utils.document(groupDAO.find(group)))));
202                         }
203                     }
204                 } catch (Exception e) {
205                     LOG.error("Errors while ingesting index {}", uindex, e);
206                 }
207 
208                 int anyObjects = anyObjectDAO.count();
209                 String aindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT);
210                 setStatus("Indexing " + anyObjects + " any objects under " + aindex + "...");
211 
212                 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
213                         maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
214 
215                     for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
216                         for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
217                             ingester.add(op -> op.index(idx -> idx.
218                                     index(aindex).
219                                     id(anyObject).
220                                     document(utils.document(anyObjectDAO.find(anyObject)))));
221                         }
222                     }
223                 } catch (Exception e) {
224                     LOG.error("Errors while ingesting index {}", uindex, e);
225                 }
226 
227                 indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping());
228 
229                 int realms = realmDAO.count();
230                 String rindex = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain());
231                 setStatus("Indexing " + realms + " realms under " + rindex + "...");
232 
233                 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
234                         maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
235 
236                     for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
237                         for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
238                             ingester.add(op -> op.index(idx -> idx.
239                                     index(rindex).
240                                     id(realm).
241                                     document(utils.document(realmDAO.find(realm)))));
242                         }
243                     }
244                 } catch (Exception e) {
245                     LOG.error("Errors while ingesting index {}", uindex, e);
246                 }
247 
248                 indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping());
249 
250                 setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed");
251             } catch (Exception e) {
252                 throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e);
253             }
254         }
255 
256         return "SUCCESS";
257     }
258 
259     @Override
260     protected boolean hasToBeRegistered(final TaskExec<?> execution) {
261         return true;
262     }
263 }