1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.pushpull;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.function.Supplier;
31 import org.apache.syncope.core.persistence.api.entity.task.ProvisioningTask;
32 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningActions;
33 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
34 import org.apache.syncope.core.provisioning.api.pushpull.SyncopeResultHandler;
35 import org.apache.syncope.core.spring.security.AuthContextUtils;
36 import org.apache.syncope.core.spring.security.SyncopeAuthenticationDetails;
37 import org.apache.syncope.core.spring.security.SyncopeGrantedAuthority;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
41 import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
42 import org.springframework.security.core.context.SecurityContextHolder;
43 import org.springframework.security.core.userdetails.User;
44
45 public abstract class SyncopeResultHandlerDispatcher<
46 T extends ProvisioningTask<?>, A extends ProvisioningActions, RA extends SyncopeResultHandler<T, A>> {
47
48 protected static final Logger LOG = LoggerFactory.getLogger(SyncopeResultHandlerDispatcher.class);
49
50 private static final String PLACEHOLDER_PWD = "PLACEHOLDER_PWD";
51
52 protected final Optional<ThreadPoolTaskExecutor> tpte;
53
54 protected final Map<String, Supplier<RA>> suppliers = new ConcurrentHashMap<>();
55
56 protected final Map<String, RA> handlers = new ConcurrentHashMap<>();
57
58 protected final List<Future<?>> futures = new ArrayList<>();
59
60 protected SyncopeResultHandlerDispatcher(final ProvisioningProfile<T, A> profile) {
61 if (profile.getTask().getConcurrentSettings() == null) {
62 tpte = Optional.empty();
63 } else {
64 ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
65 t.setCorePoolSize(profile.getTask().getConcurrentSettings().getCorePoolSize());
66 t.setMaxPoolSize(profile.getTask().getConcurrentSettings().getMaxPoolSize());
67 t.setQueueCapacity(profile.getTask().getConcurrentSettings().getQueueCapacity());
68 t.setWaitForTasksToCompleteOnShutdown(true);
69 t.setThreadNamePrefix("provisioningTask-" + profile.getTask().getKey() + "-");
70 t.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
71
72 String domain = AuthContextUtils.getDomain();
73 String delegatedBy = AuthContextUtils.getDelegatedBy().orElse(null);
74 Set<SyncopeGrantedAuthority> authorities = AuthContextUtils.getAuthorities();
75 t.setTaskDecorator(d -> () -> {
76
77 UsernamePasswordAuthenticationToken placeHolderAuth = new UsernamePasswordAuthenticationToken(
78 new User(profile.getExecutor(), PLACEHOLDER_PWD, authorities), PLACEHOLDER_PWD, authorities);
79 placeHolderAuth.setDetails(new SyncopeAuthenticationDetails(domain, delegatedBy));
80 SecurityContextHolder.getContext().setAuthentication(placeHolderAuth);
81
82 d.run();
83 });
84
85 t.initialize();
86
87 tpte = Optional.of(t);
88 }
89 }
90
91 public void addHandlerSupplier(final String key, final Supplier<RA> supplier) {
92 suppliers.put(key, supplier);
93 }
94
95 protected RA nonConcurrentHandler(final String key) {
96 return Optional.ofNullable(handlers.get(key)).orElseGet(() -> {
97 RA h = suppliers.get(key).get();
98 handlers.put(key, h);
99 return h;
100 });
101 }
102
103 protected void submit(final Runnable runnable) {
104 tpte.ifPresent(executor -> futures.add(executor.submit(runnable)));
105 }
106
107 protected void shutdown() {
108 for (Future<?> f : this.futures) {
109 try {
110 f.get();
111 } catch (ExecutionException | InterruptedException e) {
112 LOG.error("Unexpected error when waiting for completion", e);
113 }
114 }
115
116 tpte.ifPresent(ThreadPoolTaskExecutor::shutdown);
117 }
118 }