1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.job;
20
21 import java.io.IOException;
22 import org.apache.syncope.common.lib.types.AnyTypeKind;
23 import org.apache.syncope.core.persistence.api.dao.AnyDAO;
24 import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
25 import org.apache.syncope.core.persistence.api.dao.GroupDAO;
26 import org.apache.syncope.core.persistence.api.dao.RealmDAO;
27 import org.apache.syncope.core.persistence.api.dao.UserDAO;
28 import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
29 import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
30 import org.apache.syncope.core.spring.security.AuthContextUtils;
31 import org.apache.syncope.ext.opensearch.client.OpenSearchIndexManager;
32 import org.apache.syncope.ext.opensearch.client.OpenSearchUtils;
33 import org.opensearch.client.opensearch.OpenSearchClient;
34 import org.opensearch.client.opensearch._types.mapping.TypeMapping;
35 import org.opensearch.client.opensearch.core.BulkRequest;
36 import org.opensearch.client.opensearch.core.BulkResponse;
37 import org.opensearch.client.opensearch.indices.IndexSettings;
38 import org.quartz.JobExecutionContext;
39 import org.quartz.JobExecutionException;
40 import org.springframework.beans.factory.annotation.Autowired;
41
42
43
44
45 public class OpenSearchReindex extends AbstractSchedTaskJobDelegate<SchedTask> {
46
47 @Autowired
48 protected OpenSearchClient client;
49
50 @Autowired
51 protected OpenSearchIndexManager indexManager;
52
53 @Autowired
54 protected OpenSearchUtils utils;
55
56 @Autowired
57 protected UserDAO userDAO;
58
59 @Autowired
60 protected GroupDAO groupDAO;
61
62 @Autowired
63 protected AnyObjectDAO anyObjectDAO;
64
65 @Autowired
66 protected RealmDAO realmDAO;
67
68 protected IndexSettings userSettings() throws IOException {
69 return indexManager.defaultSettings();
70 }
71
72 protected IndexSettings groupSettings() throws IOException {
73 return indexManager.defaultSettings();
74 }
75
76 protected IndexSettings anyObjectSettings() throws IOException {
77 return indexManager.defaultSettings();
78 }
79
80 protected IndexSettings realmSettings() throws IOException {
81 return indexManager.defaultSettings();
82 }
83
84 protected IndexSettings auditSettings() throws IOException {
85 return indexManager.defaultSettings();
86 }
87
88 protected TypeMapping userMapping() throws IOException {
89 return indexManager.defaultAnyMapping();
90 }
91
92 protected TypeMapping groupMapping() throws IOException {
93 return indexManager.defaultAnyMapping();
94 }
95
96 protected TypeMapping anyObjectMapping() throws IOException {
97 return indexManager.defaultAnyMapping();
98 }
99
100 protected TypeMapping realmMapping() throws IOException {
101 return indexManager.defaultRealmMapping();
102 }
103
104 protected TypeMapping auditMapping() throws IOException {
105 return indexManager.defaultAuditMapping();
106 }
107
108 @Override
109 protected String doExecute(final boolean dryRun, final String executor, final JobExecutionContext context)
110 throws JobExecutionException {
111
112 if (!dryRun) {
113 setStatus("Start rebuilding indexes");
114
115 try {
116 indexManager.createAnyIndex(
117 AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping());
118
119 indexManager.createAnyIndex(
120 AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping());
121
122 indexManager.createAnyIndex(
123 AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping());
124
125 int users = userDAO.count();
126 String uindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
127 setStatus("Indexing " + users + " users under " + uindex + "...");
128 for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
129 BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
130
131 for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
132 bulkRequest.operations(op -> op.index(idx -> idx.
133 index(uindex).
134 id(user).
135 document(utils.document(userDAO.find(user)))));
136 }
137
138 try {
139 BulkResponse response = client.bulk(bulkRequest.build());
140 LOG.debug("Index successfully created for {} [{}/{}]: {}",
141 uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
142 } catch (Exception e) {
143 LOG.error("Could not create index for {} [{}/{}]: {}",
144 uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
145 }
146 }
147
148 int groups = groupDAO.count();
149 String gindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
150 setStatus("Indexing " + groups + " groups under " + gindex + "...");
151 for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
152 BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
153
154 for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
155 bulkRequest.operations(op -> op.index(idx -> idx.
156 index(gindex).
157 id(group).
158 document(utils.document(groupDAO.find(group)))));
159 }
160
161 try {
162 BulkResponse response = client.bulk(bulkRequest.build());
163 LOG.debug("Index successfully created for {} [{}/{}]: {}",
164 gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
165 } catch (Exception e) {
166 LOG.error("Could not create index for {} [{}/{}]: {}",
167 gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
168 }
169 }
170
171 int anyObjects = anyObjectDAO.count();
172 String aindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT);
173 setStatus("Indexing " + anyObjects + " any objects under " + aindex + "...");
174 for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
175 BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
176
177 for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
178 bulkRequest.operations(op -> op.index(idx -> idx.
179 index(aindex).
180 id(anyObject).
181 document(utils.document(anyObjectDAO.find(anyObject)))));
182 }
183
184 try {
185 BulkResponse response = client.bulk(bulkRequest.build());
186 LOG.debug("Index successfully created for {} [{}/{}]: {}",
187 aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
188 } catch (Exception e) {
189 LOG.error("Could not create index for {} [{}/{}]: {}",
190 aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
191 }
192 }
193
194 indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping());
195
196 int realms = realmDAO.count();
197 String rindex = OpenSearchUtils.getRealmIndex(AuthContextUtils.getDomain());
198 setStatus("Indexing " + realms + " realms under " + rindex + "...");
199 for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
200 BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
201
202 for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
203 bulkRequest.operations(op -> op.index(idx -> idx.
204 index(rindex).
205 id(realm).
206 document(utils.document(realmDAO.find(realm)))));
207 }
208
209 try {
210 BulkResponse response = client.bulk(bulkRequest.build());
211 LOG.debug("Index successfully created for {} [{}/{}]: {}",
212 rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
213 } catch (Exception e) {
214 LOG.error("Could not create index for {} [{}/{}]: {}",
215 rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
216 }
217 }
218
219 indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping());
220
221 setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed");
222 } catch (Exception e) {
223 throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e);
224 }
225 }
226
227 return "SUCCESS";
228 }
229
230 @Override
231 protected boolean hasToBeRegistered(final TaskExec<?> execution) {
232 return true;
233 }
234 }