View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.pushpull.stream;
20  
21  import java.util.List;
22  import java.util.Objects;
23  import java.util.stream.Collectors;
24  import org.apache.syncope.common.lib.to.Item;
25  import org.apache.syncope.common.lib.to.Mapping;
26  import org.apache.syncope.common.lib.to.Provision;
27  import org.apache.syncope.common.lib.to.ProvisioningReport;
28  import org.apache.syncope.common.lib.to.PushTaskTO;
29  import org.apache.syncope.common.lib.types.ConflictResolutionAction;
30  import org.apache.syncope.common.lib.types.IdMImplementationType;
31  import org.apache.syncope.common.lib.types.MappingPurpose;
32  import org.apache.syncope.common.lib.types.TaskType;
33  import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
34  import org.apache.syncope.core.persistence.api.entity.Any;
35  import org.apache.syncope.core.persistence.api.entity.AnyType;
36  import org.apache.syncope.core.persistence.api.entity.ExternalResource;
37  import org.apache.syncope.core.persistence.api.entity.Implementation;
38  import org.apache.syncope.core.persistence.api.entity.task.PushTask;
39  import org.apache.syncope.core.provisioning.api.Connector;
40  import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPushResultHandler;
41  import org.apache.syncope.core.provisioning.api.pushpull.GroupPushResultHandler;
42  import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
43  import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
44  import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
45  import org.apache.syncope.core.provisioning.api.pushpull.UserPushResultHandler;
46  import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPushExecutor;
47  import org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate;
48  import org.apache.syncope.core.provisioning.java.pushpull.PushResultHandlerDispatcher;
49  import org.apache.syncope.core.spring.ApplicationContextProvider;
50  import org.apache.syncope.core.spring.security.SecureRandomUtils;
51  import org.quartz.JobExecutionException;
52  import org.springframework.beans.factory.annotation.Autowired;
53  import org.springframework.beans.factory.support.AbstractBeanDefinition;
54  
55  public class StreamPushJobDelegate extends PushJobDelegate implements SyncopeStreamPushExecutor {
56  
57      @Autowired
58      private ImplementationDAO implementationDAO;
59  
60      @Override
61      protected AnyObjectPushResultHandler buildAnyObjectHandler() {
62          return (AnyObjectPushResultHandler) ApplicationContextProvider.getBeanFactory().createBean(
63                  StreamAnyObjectPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
64      }
65  
66      @Override
67      protected UserPushResultHandler buildUserHandler() {
68          return (UserPushResultHandler) ApplicationContextProvider.getBeanFactory().
69                  createBean(StreamUserPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
70      }
71  
72      @Override
73      protected GroupPushResultHandler buildGroupHandler() {
74          return (GroupPushResultHandler) ApplicationContextProvider.getBeanFactory().
75                  createBean(StreamGroupPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
76      }
77  
78      private ExternalResource externalResource(
79              final AnyType anyType,
80              final List<String> columns,
81              final List<String> propagationActions) throws JobExecutionException {
82  
83          Provision provision = new Provision();
84          provision.setAnyType(anyType.getKey());
85          provision.setObjectClass(anyType.getKey());
86  
87          Mapping mapping = new Mapping();
88          provision.setMapping(mapping);
89  
90          Item connObjectKeyItem = new Item();
91          connObjectKeyItem.setExtAttrName("key");
92          connObjectKeyItem.setIntAttrName("key");
93          connObjectKeyItem.setPurpose(MappingPurpose.NONE);
94          mapping.setConnObjectKeyItem(connObjectKeyItem);
95  
96          columns.stream().map(column -> {
97              Item item = new Item();
98              item.setExtAttrName(column);
99              item.setIntAttrName(column);
100             item.setPurpose(MappingPurpose.PROPAGATION);
101             mapping.add(item);
102             return item;
103         }).forEach(mapping::add);
104 
105         ExternalResource resource = entityFactory.newEntity(ExternalResource.class);
106         resource.setKey("StreamPush_" + SecureRandomUtils.generateRandomUUID().toString());
107         resource.getProvisions().add(provision);
108 
109         propagationActions.forEach(key -> {
110             Implementation impl = implementationDAO.find(key);
111             if (impl == null || !IdMImplementationType.PROPAGATION_ACTIONS.equals(impl.getType())) {
112                 LOG.debug("Invalid " + Implementation.class.getSimpleName() + " {}, ignoring...", key);
113             } else {
114                 resource.add(impl);
115             }
116         });
117 
118         return resource;
119     }
120 
121     @Override
122     public List<ProvisioningReport> push(
123             final AnyType anyType,
124             final List<? extends Any<?>> anys,
125             final List<String> columns,
126             final Connector connector,
127             final List<String> propagationActions,
128             final PushTaskTO pushTaskTO,
129             final String executor) throws JobExecutionException {
130 
131         LOG.debug("Executing stream push as {}", executor);
132 
133         taskType = TaskType.PUSH;
134         try {
135             ExternalResource resource = externalResource(anyType, columns, propagationActions);
136 
137             task = entityFactory.newEntity(PushTask.class);
138             task.setResource(resource);
139             task.setMatchingRule(pushTaskTO.getMatchingRule());
140             task.setUnmatchingRule(pushTaskTO.getUnmatchingRule());
141             task.setPerformCreate(true);
142             task.setPerformUpdate(true);
143             task.setPerformDelete(true);
144             task.setSyncStatus(false);
145 
146             profile = new ProvisioningProfile<>(connector, task);
147             profile.setExecutor(executor);
148             profile.getActions().addAll(getPushActions(pushTaskTO.getActions().stream().
149                     map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList())));
150             profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH);
151 
152             PushResultHandlerDispatcher dispatcher = new PushResultHandlerDispatcher(profile, this);
153 
154             for (PushActions action : profile.getActions()) {
155                 action.beforeAll(profile);
156             }
157 
158             dispatcher.addHandlerSupplier(anyType.getKey(), () -> {
159                 SyncopePushResultHandler handler;
160                 switch (anyType.getKind()) {
161                     case USER:
162                         handler = buildUserHandler();
163                         break;
164 
165                     case GROUP:
166                         handler = buildGroupHandler();
167                         break;
168 
169                     case ANY_OBJECT:
170                     default:
171                         handler = buildAnyObjectHandler();
172                 }
173                 handler.setProfile(profile);
174                 return handler;
175             });
176 
177             doHandle(anys, dispatcher, resource);
178 
179             for (PushActions action : profile.getActions()) {
180                 action.afterAll(profile);
181             }
182 
183             return profile.getResults();
184         } catch (Exception e) {
185             throw e instanceof JobExecutionException
186                     ? (JobExecutionException) e
187                     : new JobExecutionException("While stream pushing", e);
188         } finally {
189             setStatus(null);
190         }
191     }
192 }