View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.pushpull;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Optional;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.Future;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.function.Supplier;
31  import org.apache.syncope.core.persistence.api.entity.task.ProvisioningTask;
32  import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningActions;
33  import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
34  import org.apache.syncope.core.provisioning.api.pushpull.SyncopeResultHandler;
35  import org.apache.syncope.core.spring.security.AuthContextUtils;
36  import org.apache.syncope.core.spring.security.SyncopeAuthenticationDetails;
37  import org.apache.syncope.core.spring.security.SyncopeGrantedAuthority;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
41  import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
42  import org.springframework.security.core.context.SecurityContextHolder;
43  import org.springframework.security.core.userdetails.User;
44  
45  public abstract class SyncopeResultHandlerDispatcher<
46          T extends ProvisioningTask<?>, A extends ProvisioningActions, RA extends SyncopeResultHandler<T, A>> {
47  
48      protected static final Logger LOG = LoggerFactory.getLogger(SyncopeResultHandlerDispatcher.class);
49  
50      private static final String PLACEHOLDER_PWD = "PLACEHOLDER_PWD";
51  
52      protected final Optional<ThreadPoolTaskExecutor> tpte;
53  
54      protected final Map<String, Supplier<RA>> suppliers = new ConcurrentHashMap<>();
55  
56      protected final Map<String, RA> handlers = new ConcurrentHashMap<>();
57  
58      protected final List<Future<?>> futures = new ArrayList<>();
59  
60      protected SyncopeResultHandlerDispatcher(final ProvisioningProfile<T, A> profile) {
61          if (profile.getTask().getConcurrentSettings() == null) {
62              tpte = Optional.empty();
63          } else {
64              ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
65              t.setCorePoolSize(profile.getTask().getConcurrentSettings().getCorePoolSize());
66              t.setMaxPoolSize(profile.getTask().getConcurrentSettings().getMaxPoolSize());
67              t.setQueueCapacity(profile.getTask().getConcurrentSettings().getQueueCapacity());
68              t.setWaitForTasksToCompleteOnShutdown(true);
69              t.setThreadNamePrefix("provisioningTask-" + profile.getTask().getKey() + "-");
70              t.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
71  
72              String domain = AuthContextUtils.getDomain();
73              String delegatedBy = AuthContextUtils.getDelegatedBy().orElse(null);
74              Set<SyncopeGrantedAuthority> authorities = AuthContextUtils.getAuthorities();
75              t.setTaskDecorator(d -> () -> {
76                  // set placeholder authentication object by creating fresh and copying data from caller's
77                  UsernamePasswordAuthenticationToken placeHolderAuth = new UsernamePasswordAuthenticationToken(
78                          new User(profile.getExecutor(), PLACEHOLDER_PWD, authorities), PLACEHOLDER_PWD, authorities);
79                  placeHolderAuth.setDetails(new SyncopeAuthenticationDetails(domain, delegatedBy));
80                  SecurityContextHolder.getContext().setAuthentication(placeHolderAuth);
81  
82                  d.run();
83              });
84  
85              t.initialize();
86  
87              tpte = Optional.of(t);
88          }
89      }
90  
91      public void addHandlerSupplier(final String key, final Supplier<RA> supplier) {
92          suppliers.put(key, supplier);
93      }
94  
95      protected RA nonConcurrentHandler(final String key) {
96          return Optional.ofNullable(handlers.get(key)).orElseGet(() -> {
97              RA h = suppliers.get(key).get();
98              handlers.put(key, h);
99              return h;
100         });
101     }
102 
103     protected void submit(final Runnable runnable) {
104         tpte.ifPresent(executor -> futures.add(executor.submit(runnable)));
105     }
106 
107     protected void shutdown() {
108         for (Future<?> f : this.futures) {
109             try {
110                 f.get();
111             } catch (ExecutionException | InterruptedException e) {
112                 LOG.error("Unexpected error when waiting for completion", e);
113             }
114         }
115 
116         tpte.ifPresent(ThreadPoolTaskExecutor::shutdown);
117     }
118 }