1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.job;
20
21 import co.elastic.clients.elasticsearch.ElasticsearchClient;
22 import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
23 import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
24 import co.elastic.clients.elasticsearch._types.ErrorCause;
25 import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
26 import co.elastic.clients.elasticsearch.core.BulkRequest;
27 import co.elastic.clients.elasticsearch.core.BulkResponse;
28 import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
29 import co.elastic.clients.elasticsearch.indices.IndexSettings;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Objects;
33 import java.util.stream.Collectors;
34 import org.apache.syncope.common.lib.types.AnyTypeKind;
35 import org.apache.syncope.core.persistence.api.dao.AnyDAO;
36 import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
37 import org.apache.syncope.core.persistence.api.dao.GroupDAO;
38 import org.apache.syncope.core.persistence.api.dao.RealmDAO;
39 import org.apache.syncope.core.persistence.api.dao.UserDAO;
40 import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
41 import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
42 import org.apache.syncope.core.spring.security.AuthContextUtils;
43 import org.apache.syncope.ext.elasticsearch.client.ElasticsearchIndexManager;
44 import org.apache.syncope.ext.elasticsearch.client.ElasticsearchUtils;
45 import org.quartz.JobExecutionContext;
46 import org.quartz.JobExecutionException;
47 import org.springframework.beans.factory.annotation.Autowired;
48
49
50
51
52 public class ElasticsearchReindex extends AbstractSchedTaskJobDelegate<SchedTask> {
53
54 protected static class ErrorLoggingBulkListener implements BulkListener<Void> {
55
56 protected static final ErrorLoggingBulkListener INSTANCE = new ErrorLoggingBulkListener();
57
58 @Override
59 public void beforeBulk(
60 final long executionId,
61 final BulkRequest request,
62 final List<Void> contexts) {
63
64
65 }
66
67 @Override
68 public void afterBulk(
69 final long executionId,
70 final BulkRequest request,
71 final List<Void> contexts,
72 final BulkResponse response) {
73
74 if (response.errors()) {
75 String details = response.items().stream().map(BulkResponseItem::error).
76 filter(Objects::nonNull).map(ErrorCause::toString).collect(Collectors.joining(", "));
77 LOG.error("Errors found for request {}; details: {}", executionId, details);
78 }
79 }
80
81 @Override
82 public void afterBulk(
83 final long executionId,
84 final BulkRequest request,
85 final List<Void> contexts,
86 final Throwable failure) {
87
88 LOG.error("Bulk request {} failed", executionId, failure);
89 }
90 }
91
92 @Autowired
93 protected ElasticsearchClient client;
94
95 @Autowired
96 protected ElasticsearchIndexManager indexManager;
97
98 @Autowired
99 protected ElasticsearchUtils utils;
100
101 @Autowired
102 protected UserDAO userDAO;
103
104 @Autowired
105 protected GroupDAO groupDAO;
106
107 @Autowired
108 protected AnyObjectDAO anyObjectDAO;
109
110 @Autowired
111 protected RealmDAO realmDAO;
112
113 protected IndexSettings userSettings() throws IOException {
114 return indexManager.defaultSettings();
115 }
116
117 protected IndexSettings groupSettings() throws IOException {
118 return indexManager.defaultSettings();
119 }
120
121 protected IndexSettings anyObjectSettings() throws IOException {
122 return indexManager.defaultSettings();
123 }
124
125 protected IndexSettings realmSettings() throws IOException {
126 return indexManager.defaultSettings();
127 }
128
129 protected IndexSettings auditSettings() throws IOException {
130 return indexManager.defaultSettings();
131 }
132
133 protected TypeMapping userMapping() throws IOException {
134 return indexManager.defaultAnyMapping();
135 }
136
137 protected TypeMapping groupMapping() throws IOException {
138 return indexManager.defaultAnyMapping();
139 }
140
141 protected TypeMapping anyObjectMapping() throws IOException {
142 return indexManager.defaultAnyMapping();
143 }
144
145 protected TypeMapping realmMapping() throws IOException {
146 return indexManager.defaultRealmMapping();
147 }
148
149 protected TypeMapping auditMapping() throws IOException {
150 return indexManager.defaultAuditMapping();
151 }
152
153 @Override
154 protected String doExecute(final boolean dryRun, final String executor, final JobExecutionContext context)
155 throws JobExecutionException {
156
157 if (!dryRun) {
158 setStatus("Start rebuilding indexes");
159
160 try {
161 indexManager.createAnyIndex(
162 AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping());
163
164 indexManager.createAnyIndex(
165 AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping());
166
167 indexManager.createAnyIndex(
168 AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping());
169
170 int users = userDAO.count();
171 String uindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
172 setStatus("Indexing " + users + " users under " + uindex + "...");
173
174 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
175 maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
176
177 for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
178 for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
179 ingester.add(op -> op.index(idx -> idx.
180 index(uindex).
181 id(user).
182 document(utils.document(userDAO.find(user)))));
183 }
184 }
185 } catch (Exception e) {
186 LOG.error("Errors while ingesting index {}", uindex, e);
187 }
188
189 int groups = groupDAO.count();
190 String gindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
191 setStatus("Indexing " + groups + " groups under " + gindex + "...");
192
193 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
194 maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
195
196 for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
197 for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
198 ingester.add(op -> op.index(idx -> idx.
199 index(gindex).
200 id(group).
201 document(utils.document(groupDAO.find(group)))));
202 }
203 }
204 } catch (Exception e) {
205 LOG.error("Errors while ingesting index {}", uindex, e);
206 }
207
208 int anyObjects = anyObjectDAO.count();
209 String aindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT);
210 setStatus("Indexing " + anyObjects + " any objects under " + aindex + "...");
211
212 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
213 maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
214
215 for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
216 for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
217 ingester.add(op -> op.index(idx -> idx.
218 index(aindex).
219 id(anyObject).
220 document(utils.document(anyObjectDAO.find(anyObject)))));
221 }
222 }
223 } catch (Exception e) {
224 LOG.error("Errors while ingesting index {}", uindex, e);
225 }
226
227 indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping());
228
229 int realms = realmDAO.count();
230 String rindex = ElasticsearchUtils.getRealmIndex(AuthContextUtils.getDomain());
231 setStatus("Indexing " + realms + " realms under " + rindex + "...");
232
233 try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).
234 maxOperations(AnyDAO.DEFAULT_PAGE_SIZE).listener(ErrorLoggingBulkListener.INSTANCE))) {
235
236 for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
237 for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
238 ingester.add(op -> op.index(idx -> idx.
239 index(rindex).
240 id(realm).
241 document(utils.document(realmDAO.find(realm)))));
242 }
243 }
244 } catch (Exception e) {
245 LOG.error("Errors while ingesting index {}", uindex, e);
246 }
247
248 indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping());
249
250 setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed");
251 } catch (Exception e) {
252 throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e);
253 }
254 }
255
256 return "SUCCESS";
257 }
258
259 @Override
260 protected boolean hasToBeRegistered(final TaskExec<?> execution) {
261 return true;
262 }
263 }