View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.job;
20  
21  import java.io.IOException;
22  import org.apache.syncope.common.lib.types.AnyTypeKind;
23  import org.apache.syncope.core.persistence.api.dao.AnyDAO;
24  import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
25  import org.apache.syncope.core.persistence.api.dao.GroupDAO;
26  import org.apache.syncope.core.persistence.api.dao.RealmDAO;
27  import org.apache.syncope.core.persistence.api.dao.UserDAO;
28  import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
29  import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
30  import org.apache.syncope.core.spring.security.AuthContextUtils;
31  import org.apache.syncope.ext.opensearch.client.OpenSearchIndexManager;
32  import org.apache.syncope.ext.opensearch.client.OpenSearchUtils;
33  import org.opensearch.client.opensearch.OpenSearchClient;
34  import org.opensearch.client.opensearch._types.mapping.TypeMapping;
35  import org.opensearch.client.opensearch.core.BulkRequest;
36  import org.opensearch.client.opensearch.core.BulkResponse;
37  import org.opensearch.client.opensearch.indices.IndexSettings;
38  import org.quartz.JobExecutionContext;
39  import org.quartz.JobExecutionException;
40  import org.springframework.beans.factory.annotation.Autowired;
41  
42  /**
43   * Remove and rebuild all OpenSearch indexes with information from existing users, groups and any objects.
44   */
45  public class OpenSearchReindex extends AbstractSchedTaskJobDelegate<SchedTask> {
46  
47      @Autowired
48      protected OpenSearchClient client;
49  
50      @Autowired
51      protected OpenSearchIndexManager indexManager;
52  
53      @Autowired
54      protected OpenSearchUtils utils;
55  
56      @Autowired
57      protected UserDAO userDAO;
58  
59      @Autowired
60      protected GroupDAO groupDAO;
61  
62      @Autowired
63      protected AnyObjectDAO anyObjectDAO;
64  
65      @Autowired
66      protected RealmDAO realmDAO;
67  
68      protected IndexSettings userSettings() throws IOException {
69          return indexManager.defaultSettings();
70      }
71  
72      protected IndexSettings groupSettings() throws IOException {
73          return indexManager.defaultSettings();
74      }
75  
76      protected IndexSettings anyObjectSettings() throws IOException {
77          return indexManager.defaultSettings();
78      }
79  
80      protected IndexSettings realmSettings() throws IOException {
81          return indexManager.defaultSettings();
82      }
83  
84      protected IndexSettings auditSettings() throws IOException {
85          return indexManager.defaultSettings();
86      }
87  
88      protected TypeMapping userMapping() throws IOException {
89          return indexManager.defaultAnyMapping();
90      }
91  
92      protected TypeMapping groupMapping() throws IOException {
93          return indexManager.defaultAnyMapping();
94      }
95  
96      protected TypeMapping anyObjectMapping() throws IOException {
97          return indexManager.defaultAnyMapping();
98      }
99  
100     protected TypeMapping realmMapping() throws IOException {
101         return indexManager.defaultRealmMapping();
102     }
103 
104     protected TypeMapping auditMapping() throws IOException {
105         return indexManager.defaultAuditMapping();
106     }
107 
108     @Override
109     protected String doExecute(final boolean dryRun, final String executor, final JobExecutionContext context)
110             throws JobExecutionException {
111 
112         if (!dryRun) {
113             setStatus("Start rebuilding indexes");
114 
115             try {
116                 indexManager.createRealmIndex(AuthContextUtils.getDomain(), realmSettings(), realmMapping());
117 
118                 int realms = realmDAO.count();
119                 String rindex = OpenSearchUtils.getRealmIndex(AuthContextUtils.getDomain());
120                 setStatus("Indexing " + realms + " realms under " + rindex + "...");
121                 for (int page = 1; page <= (realms / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
122                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
123 
124                     for (String realm : realmDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
125                         bulkRequest.operations(op -> op.index(idx -> idx.
126                                 index(rindex).
127                                 id(realm).
128                                 document(utils.document(realmDAO.find(realm)))));
129                     }
130 
131                     try {
132                         BulkResponse response = client.bulk(bulkRequest.build());
133                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
134                                 rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
135                     } catch (Exception e) {
136                         LOG.error("Could not create index for {} [{}/{}]: {}",
137                                 rindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
138                     }
139                 }
140 
141                 indexManager.createAnyIndex(
142                         AuthContextUtils.getDomain(), AnyTypeKind.USER, userSettings(), userMapping());
143 
144                 int users = userDAO.count();
145                 String uindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
146                 setStatus("Indexing " + users + " users under " + uindex + "...");
147                 for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
148                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
149 
150                     for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
151                         bulkRequest.operations(op -> op.index(idx -> idx.
152                                 index(uindex).
153                                 id(user).
154                                 document(utils.document(userDAO.find(user)))));
155                     }
156 
157                     try {
158                         BulkResponse response = client.bulk(bulkRequest.build());
159                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
160                                 uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
161                     } catch (Exception e) {
162                         LOG.error("Could not create index for {} [{}/{}]: {}",
163                                 uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
164                     }
165                 }
166 
167                 indexManager.createAnyIndex(
168                         AuthContextUtils.getDomain(), AnyTypeKind.GROUP, groupSettings(), groupMapping());
169 
170                 int groups = groupDAO.count();
171                 String gindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
172                 setStatus("Indexing " + groups + " groups under " + gindex + "...");
173                 for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
174                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
175 
176                     for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
177                         bulkRequest.operations(op -> op.index(idx -> idx.
178                                 index(gindex).
179                                 id(group).
180                                 document(utils.document(groupDAO.find(group)))));
181                     }
182 
183                     try {
184                         BulkResponse response = client.bulk(bulkRequest.build());
185                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
186                                 gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
187                     } catch (Exception e) {
188                         LOG.error("Could not create index for {} [{}/{}]: {}",
189                                 gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
190                     }
191                 }
192 
193                 indexManager.createAnyIndex(
194                         AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping());
195 
196                 int anyObjects = anyObjectDAO.count();
197                 String aindex = OpenSearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT);
198                 setStatus("Indexing " + anyObjects + " any objects under " + aindex + "...");
199                 for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
200                     BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
201 
202                     for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
203                         bulkRequest.operations(op -> op.index(idx -> idx.
204                                 index(aindex).
205                                 id(anyObject).
206                                 document(utils.document(anyObjectDAO.find(anyObject)))));
207                     }
208 
209                     try {
210                         BulkResponse response = client.bulk(bulkRequest.build());
211                         LOG.debug("Index successfully created for {} [{}/{}]: {}",
212                                 aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
213                     } catch (Exception e) {
214                         LOG.error("Could not create index for {} [{}/{}]: {}",
215                                 aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
216                     }
217                 }
218 
219                 indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping());
220 
221                 setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed");
222 
223                 return "Indexes created:\n"
224                         + " " + rindex + " [" + realms + "]\n"
225                         + " " + uindex + " [" + users + "]\n"
226                         + " " + gindex + " [" + groups + "]\n"
227                         + " " + aindex + " [" + anyObjects + "]\n"
228                         + " " + OpenSearchUtils.getAuditIndex(AuthContextUtils.getDomain());
229             } catch (Exception e) {
230                 throw new JobExecutionException("While rebuilding index for domain " + AuthContextUtils.getDomain(), e);
231             }
232         }
233 
234         return "SUCCESS";
235     }
236 
237     @Override
238     protected boolean hasToBeRegistered(final TaskExec<?> execution) {
239         return true;
240     }
241 }