View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.job;
20  
21  import java.io.IOException;
22  import org.apache.syncope.common.lib.types.AnyTypeKind;
23  import org.apache.syncope.core.persistence.api.dao.AnyDAO;
24  import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
25  import org.apache.syncope.core.persistence.api.dao.GroupDAO;
26  import org.apache.syncope.core.persistence.api.dao.RealmDAO;
27  import org.apache.syncope.core.persistence.api.dao.UserDAO;
28  import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
29  import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
30  import org.apache.syncope.core.spring.security.AuthContextUtils;
31  import org.apache.syncope.ext.opensearch.client.OpenSearchIndexManager;
32  import org.apache.syncope.ext.opensearch.client.OpenSearchUtils;
33  import org.opensearch.client.opensearch.OpenSearchClient;
34  import org.opensearch.client.opensearch._types.mapping.TypeMapping;
35  import org.opensearch.client.opensearch.core.BulkRequest;
36  import org.opensearch.client.opensearch.core.BulkResponse;
37  import org.opensearch.client.opensearch.indices.IndexSettings;
38  import org.quartz.JobExecutionContext;
39  import org.quartz.JobExecutionException;
40  import org.springframework.beans.factory.annotation.Autowired;
41  
42  /**
43   * Remove and rebuild all OpenSearch indexes with information from existing users, groups and any objects.
44   */
45  public class OpenSearchReindex extends AbstractSchedTaskJobDelegate<SchedTask> {
46  
47      @Autowired
48      protected OpenSearchClient client;
49  
50      @Autowired
51      protected OpenSearchIndexManager indexManager;
52  
53      @Autowired
54      protected OpenSearchUtils utils;
55  
56      @Autowired
57      protected UserDAO userDAO;
58  
59      @Autowired
60      protected GroupDAO groupDAO;
61  
62      @Autowired
63      protected AnyObjectDAO anyObjectDAO;
64  
65      @Autowired
66      protected RealmDAO realmDAO;
67  
68      protected IndexSettings userSettings() throws IOException {
69          return indexManager.defaultSettings();
70      }
71  
72      protected IndexSettings groupSettings() throws IOException {
73          return indexManager.defaultSettings();
74      }
75  
76      protected IndexSettings anyObjectSettings() throws IOException {
77          return indexManager.defaultSettings();
78      }
79  
80      protected IndexSettings realmSettings() throws IOException {
81          return indexManager.defaultSettings();
82      }
83  
84      protected IndexSettings auditSettings() throws IOException {
85          return indexManager.defaultSettings();
86      }
87  
88      protected TypeMapping userMapping() throws IOException {
89          return indexManager.defaultAnyMapping();
90      }
91  
92      protected TypeMapping groupMapping() throws IOException {
93          return indexManager.defaultAnyMapping();
94      }
95  
96      protected TypeMapping anyObjectMapping() throws IOException {
97          return indexManager.defaultAnyMapping();
98      }
99  
100     protected TypeMapping realmMapping() throws IOException {
101         return indexManager.defaultRealmMapping();
102     }
103 
104     protected TypeMapping auditMapping() throws IOException {
105         return indexManager.defaultAuditMapping();
106     }
107 
108     @Override
109     protected String doExecute(final boolean dryRun, final String executor, final JobExecutionContext context)
110             throws JobExecutionException {
111 
112         if (!dryRun) {
113             setStatus("Start rebuilding indexes");
114 
115             try {
116                 indexManager.createAnyIndex(
117                         AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping());
118 
119                 indexManager.createAnyIndex(
120                         AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping());
121 
122                 indexManager.createAnyIndex(
123                         AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping());
124 
125                 int users = userDAO.count();
126                 String uindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
127                 setStatus("Indexing " + users + " users under " + uindex + "...");
128                 for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
129                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
130 
131                     for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
132                         bulkRequest.operations(op -> op.index(idx -> idx.
133                                 index(uindex).
134                                 id(user).
135                                 document(utils.document(userDAO.find(user)))));
136                     }
137 
138                     try {
139                         BulkResponse response = client.bulk(bulkRequest.build());
140                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
141                                 uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
142                     } catch (Exception e) {
143                         LOG.error("Could not create index for {} [{}/{}]: {}",
144                                 uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
145                     }
146                 }
147 
148                 int groups = groupDAO.count();
149                 String gindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
150                 setStatus("Indexing " + groups + " groups under " + gindex + "...");
151                 for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
152                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
153 
154                     for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
155                         bulkRequest.operations(op -> op.index(idx -> idx.
156                                 index(gindex).
157                                 id(group).
158                                 document(utils.document(groupDAO.find(group)))));
159                     }
160 
161                     try {
162                         BulkResponse response = client.bulk(bulkRequest.build());
163                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
164                                 gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
165                     } catch (Exception e) {
166                         LOG.error("Could not create index for {} [{}/{}]: {}",
167                                 gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
168                     }
169                 }
170 
171                 int anyObjects = anyObjectDAO.count();
172                 String aindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT);
173                 setStatus("Indexing " + anyObjects + " any objects under " + aindex + "...");
174                 for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
175                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
176 
177                     for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
178                         bulkRequest.operations(op -> op.index(idx -> idx.
179                                 index(aindex).
180                                 id(anyObject).
181                                 document(utils.document(anyObjectDAO.find(anyObject)))));
182                     }
183 
184                     try {
185                         BulkResponse response = client.bulk(bulkRequest.build());
186                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
187                                 aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
188                     } catch (Exception e) {
189                         LOG.error("Could not create index for {} [{}/{}]: {}",
190                                 aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
191                     }
192                 }
193 
194                 indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping());
195 
196                 int realms = realmDAO.count();
197                 String rindex = OpenSearchUtils.getRealmIndex(AuthContextUtils.getDomain());
198                 setStatus("Indexing " + realms + " realms under " + rindex + "...");
199                 for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
200                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
201 
202                     for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
203                         bulkRequest.operations(op -> op.index(idx -> idx.
204                                 index(rindex).
205                                 id(realm).
206                                 document(utils.document(realmDAO.find(realm)))));
207                     }
208 
209                     try {
210                         BulkResponse response = client.bulk(bulkRequest.build());
211                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
212                                 rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
213                     } catch (Exception e) {
214                         LOG.error("Could not create index for {} [{}/{}]: {}",
215                                 rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
216                     }
217                 }
218 
219                 indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping());
220 
221                 setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed");
222             } catch (Exception e) {
223                 throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e);
224             }
225         }
226 
227         return "SUCCESS";
228     }
229 
230     @Override
231     protected boolean hasToBeRegistered(final TaskExec<?> execution) {
232         return true;
233     }
234 }