1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.pushpull.stream;
20
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.stream.Collectors;
24 import org.apache.syncope.common.lib.to.Item;
25 import org.apache.syncope.common.lib.to.Mapping;
26 import org.apache.syncope.common.lib.to.Provision;
27 import org.apache.syncope.common.lib.to.ProvisioningReport;
28 import org.apache.syncope.common.lib.to.PushTaskTO;
29 import org.apache.syncope.common.lib.types.ConflictResolutionAction;
30 import org.apache.syncope.common.lib.types.IdMImplementationType;
31 import org.apache.syncope.common.lib.types.MappingPurpose;
32 import org.apache.syncope.common.lib.types.TaskType;
33 import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
34 import org.apache.syncope.core.persistence.api.entity.Any;
35 import org.apache.syncope.core.persistence.api.entity.AnyType;
36 import org.apache.syncope.core.persistence.api.entity.ExternalResource;
37 import org.apache.syncope.core.persistence.api.entity.Implementation;
38 import org.apache.syncope.core.persistence.api.entity.task.PushTask;
39 import org.apache.syncope.core.provisioning.api.Connector;
40 import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPushResultHandler;
41 import org.apache.syncope.core.provisioning.api.pushpull.GroupPushResultHandler;
42 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
43 import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
44 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
45 import org.apache.syncope.core.provisioning.api.pushpull.UserPushResultHandler;
46 import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPushExecutor;
47 import org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate;
48 import org.apache.syncope.core.provisioning.java.pushpull.PushResultHandlerDispatcher;
49 import org.apache.syncope.core.spring.ApplicationContextProvider;
50 import org.apache.syncope.core.spring.security.SecureRandomUtils;
51 import org.quartz.JobExecutionException;
52 import org.springframework.beans.factory.annotation.Autowired;
53 import org.springframework.beans.factory.support.AbstractBeanDefinition;
54
55 public class StreamPushJobDelegate extends PushJobDelegate implements SyncopeStreamPushExecutor {
56
57 @Autowired
58 private ImplementationDAO implementationDAO;
59
60 @Override
61 protected AnyObjectPushResultHandler buildAnyObjectHandler() {
62 return (AnyObjectPushResultHandler) ApplicationContextProvider.getBeanFactory().createBean(
63 StreamAnyObjectPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
64 }
65
66 @Override
67 protected UserPushResultHandler buildUserHandler() {
68 return (UserPushResultHandler) ApplicationContextProvider.getBeanFactory().
69 createBean(StreamUserPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
70 }
71
72 @Override
73 protected GroupPushResultHandler buildGroupHandler() {
74 return (GroupPushResultHandler) ApplicationContextProvider.getBeanFactory().
75 createBean(StreamGroupPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
76 }
77
78 private ExternalResource externalResource(
79 final AnyType anyType,
80 final List<String> columns,
81 final List<String> propagationActions) throws JobExecutionException {
82
83 Provision provision = new Provision();
84 provision.setAnyType(anyType.getKey());
85 provision.setObjectClass(anyType.getKey());
86
87 Mapping mapping = new Mapping();
88 provision.setMapping(mapping);
89
90 Item connObjectKeyItem = new Item();
91 connObjectKeyItem.setExtAttrName("key");
92 connObjectKeyItem.setIntAttrName("key");
93 connObjectKeyItem.setPurpose(MappingPurpose.NONE);
94 mapping.setConnObjectKeyItem(connObjectKeyItem);
95
96 columns.stream().map(column -> {
97 Item item = new Item();
98 item.setExtAttrName(column);
99 item.setIntAttrName(column);
100 item.setPurpose(MappingPurpose.PROPAGATION);
101 mapping.add(item);
102 return item;
103 }).forEach(mapping::add);
104
105 ExternalResource resource = entityFactory.newEntity(ExternalResource.class);
106 resource.setKey("StreamPush_" + SecureRandomUtils.generateRandomUUID().toString());
107 resource.getProvisions().add(provision);
108
109 propagationActions.forEach(key -> {
110 Implementation impl = implementationDAO.find(key);
111 if (impl == null || !IdMImplementationType.PROPAGATION_ACTIONS.equals(impl.getType())) {
112 LOG.debug("Invalid " + Implementation.class.getSimpleName() + " {}, ignoring...", key);
113 } else {
114 resource.add(impl);
115 }
116 });
117
118 return resource;
119 }
120
121 @Override
122 public List<ProvisioningReport> push(
123 final AnyType anyType,
124 final List<? extends Any<?>> anys,
125 final List<String> columns,
126 final Connector connector,
127 final List<String> propagationActions,
128 final PushTaskTO pushTaskTO,
129 final String executor) throws JobExecutionException {
130
131 LOG.debug("Executing stream push as {}", executor);
132
133 taskType = TaskType.PUSH;
134 try {
135 ExternalResource resource = externalResource(anyType, columns, propagationActions);
136
137 task = entityFactory.newEntity(PushTask.class);
138 task.setResource(resource);
139 task.setMatchingRule(pushTaskTO.getMatchingRule());
140 task.setUnmatchingRule(pushTaskTO.getUnmatchingRule());
141 task.setPerformCreate(true);
142 task.setPerformUpdate(true);
143 task.setPerformDelete(true);
144 task.setSyncStatus(false);
145
146 profile = new ProvisioningProfile<>(connector, task);
147 profile.setExecutor(executor);
148 profile.getActions().addAll(getPushActions(pushTaskTO.getActions().stream().
149 map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList())));
150 profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH);
151
152 PushResultHandlerDispatcher dispatcher = new PushResultHandlerDispatcher(profile, this);
153
154 for (PushActions action : profile.getActions()) {
155 action.beforeAll(profile);
156 }
157
158 dispatcher.addHandlerSupplier(anyType.getKey(), () -> {
159 SyncopePushResultHandler handler;
160 switch (anyType.getKind()) {
161 case USER:
162 handler = buildUserHandler();
163 break;
164
165 case GROUP:
166 handler = buildGroupHandler();
167 break;
168
169 case ANY_OBJECT:
170 default:
171 handler = buildAnyObjectHandler();
172 }
173 handler.setProfile(profile);
174 return handler;
175 });
176
177 doHandle(anys, dispatcher, resource);
178
179 for (PushActions action : profile.getActions()) {
180 action.afterAll(profile);
181 }
182
183 return profile.getResults();
184 } catch (Exception e) {
185 throw e instanceof JobExecutionException
186 ? (JobExecutionException) e
187 : new JobExecutionException("While stream pushing", e);
188 } finally {
189 setStatus(null);
190 }
191 }
192 }