View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.pushpull.stream;
20  
21  import com.fasterxml.jackson.databind.MappingIterator;
22  import com.fasterxml.jackson.databind.SequenceWriter;
23  import com.fasterxml.jackson.dataformat.csv.CsvMapper;
24  import com.fasterxml.jackson.dataformat.csv.CsvParser;
25  import com.fasterxml.jackson.dataformat.csv.CsvSchema;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.util.ArrayList;
30  import java.util.LinkedHashMap;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.atomic.AtomicReference;
35  import java.util.stream.Collectors;
36  import org.apache.commons.lang3.StringUtils;
37  import org.apache.syncope.common.rest.api.beans.CSVPullSpec;
38  import org.apache.syncope.core.persistence.api.entity.ConnInstance;
39  import org.apache.syncope.core.provisioning.api.Connector;
40  import org.identityconnectors.framework.common.exceptions.ConnectorException;
41  import org.identityconnectors.framework.common.objects.Attribute;
42  import org.identityconnectors.framework.common.objects.AttributeBuilder;
43  import org.identityconnectors.framework.common.objects.AttributeDelta;
44  import org.identityconnectors.framework.common.objects.AttributeUtil;
45  import org.identityconnectors.framework.common.objects.ConnectorObject;
46  import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
47  import org.identityconnectors.framework.common.objects.ObjectClass;
48  import org.identityconnectors.framework.common.objects.ObjectClassInfo;
49  import org.identityconnectors.framework.common.objects.OperationOptions;
50  import org.identityconnectors.framework.common.objects.SearchResult;
51  import org.identityconnectors.framework.common.objects.SyncResultsHandler;
52  import org.identityconnectors.framework.common.objects.SyncToken;
53  import org.identityconnectors.framework.common.objects.Uid;
54  import org.identityconnectors.framework.common.objects.filter.Filter;
55  import org.identityconnectors.framework.spi.SearchResultsHandler;
56  import org.slf4j.Logger;
57  import org.slf4j.LoggerFactory;
58  import org.springframework.util.CollectionUtils;
59  
60  public class CSVStreamConnector implements Connector, AutoCloseable {
61  
62      private static final Logger LOG = LoggerFactory.getLogger(CSVStreamConnector.class);
63  
64      private final String keyColumn;
65  
66      private final String arrayElementsSeparator;
67  
68      private final CsvSchema.Builder schemaBuilder;
69  
70      private final InputStream in;
71  
72      private final OutputStream out;
73  
74      private final List<String> columns;
75  
76      private MappingIterator<Map<String, String>> reader;
77  
78      private SequenceWriter writer;
79  
80      public CSVStreamConnector(
81              final String keyColumn,
82              final String arrayElementsSeparator,
83              final CsvSchema.Builder schemaBuilder,
84              final InputStream in,
85              final OutputStream out,
86              final String... columns) {
87  
88          this.keyColumn = keyColumn;
89          this.arrayElementsSeparator = arrayElementsSeparator;
90          this.schemaBuilder = schemaBuilder;
91          this.in = in;
92          this.out = out;
93          this.columns = List.of(columns);
94      }
95  
96      @Override
97      public void close() throws IOException {
98          if (reader != null) {
99              reader.close();
100         }
101         if (writer != null) {
102             writer.close();
103         }
104     }
105 
106     public MappingIterator<Map<String, String>> reader() throws IOException {
107         synchronized (this) {
108             if (reader == null) {
109                 reader = new CsvMapper().
110                         enable(CsvParser.Feature.SKIP_EMPTY_LINES).
111                         readerFor(Map.class).with(schemaBuilder.build()).readValues(in);
112             }
113         }
114         return reader;
115     }
116 
117     public List<String> getColumns(final CSVPullSpec spec) throws IOException {
118         List<String> fromSpec = new ArrayList<>();
119         ((CsvSchema) reader().getParserSchema()).forEach(column -> {
120             if (!spec.getIgnoreColumns().contains(column.getName())) {
121                 fromSpec.add(column.getName());
122             }
123         });
124         return fromSpec;
125     }
126 
127     public SequenceWriter writer() throws IOException {
128         synchronized (this) {
129             if (writer == null) {
130                 writer = new CsvMapper().writerFor(Map.class).with(schemaBuilder.build()).writeValues(out);
131             }
132         }
133         return writer;
134     }
135 
136     @Override
137     public Uid authenticate(final String username, final String password, final OperationOptions options) {
138         return null;
139     }
140 
141     @Override
142     public ConnInstance getConnInstance() {
143         return null;
144     }
145 
146     @Override
147     public Uid create(
148             final ObjectClass objectClass,
149             final Set<Attribute> attrs,
150             final OperationOptions options,
151             final AtomicReference<Boolean> propagationAttempted) {
152 
153         synchronized (schemaBuilder) {
154             if (schemaBuilder.size() == 0) {
155                 attrs.stream().filter(attr -> !AttributeUtil.isSpecial(attr)).map(Attribute::getName).
156                         sorted((c1, c2) -> {
157                             // sort according to the passed columns, leave any additional column at the end
158                             int index1 = columns.indexOf(c1);
159                             if (index1 == -1) {
160                                 index1 = Integer.MAX_VALUE;
161                             }
162                             int index2 = columns.indexOf(c2);
163                             if (index2 == -1) {
164                                 index2 = Integer.MAX_VALUE;
165                             }
166                             return Integer.compare(index1, index2);
167                         }).
168                         forEachOrdered(schemaBuilder::addColumn);
169             }
170         }
171 
172         Map<String, String> row = new LinkedHashMap<>();
173         attrs.stream().filter(attr -> !AttributeUtil.isSpecial(attr)).forEach(attr -> {
174             if (CollectionUtils.isEmpty(attr.getValue()) || attr.getValue().get(0) == null) {
175                 row.put(attr.getName(), null);
176             } else if (attr.getValue().size() == 1) {
177                 row.put(attr.getName(), attr.getValue().get(0).toString());
178             } else if (arrayElementsSeparator == null) {
179                 row.put(attr.getName(), attr.getValue().toString());
180             } else {
181                 row.put(
182                         attr.getName(),
183                         attr.getValue().stream().map(Object::toString).
184                                 collect(Collectors.joining(arrayElementsSeparator)));
185             }
186         });
187         try {
188             writer().write(row);
189         } catch (IOException e) {
190             throw new ConnectorException("Could not write object " + row, e);
191         }
192         propagationAttempted.set(Boolean.TRUE);
193         return null;
194     }
195 
196     @Override
197     public Uid update(
198             final ObjectClass objectClass,
199             final Uid uid,
200             final Set<Attribute> attrs,
201             final OperationOptions options,
202             final AtomicReference<Boolean> propagationAttempted) {
203 
204         return null;
205     }
206 
207     @Override
208     public Set<AttributeDelta> updateDelta(
209             final ObjectClass objectClass,
210             final Uid uid,
211             final Set<AttributeDelta> modifications,
212             final OperationOptions options,
213             final AtomicReference<Boolean> propagationAttempted) {
214 
215         return Set.of();
216     }
217 
218     @Override
219     public void delete(
220             final ObjectClass objectClass,
221             final Uid uid,
222             final OperationOptions options,
223             final AtomicReference<Boolean> propagationAttempted) {
224 
225         // nothing to do
226     }
227 
228     @Override
229     public void sync(
230             final ObjectClass objectClass,
231             final SyncToken token,
232             final SyncResultsHandler handler,
233             final OperationOptions options) {
234 
235         throw new UnsupportedOperationException();
236     }
237 
238     @Override
239     public SyncToken getLatestSyncToken(final ObjectClass objectClass) {
240         throw new UnsupportedOperationException();
241     }
242 
243     @Override
244     public ConnectorObject getObject(
245             final ObjectClass objectClass,
246             final Attribute connObjectKey,
247             final boolean ignoreCaseMatch,
248             final OperationOptions options) {
249 
250         return null;
251     }
252 
253     @Override
254     public SearchResult search(
255             final ObjectClass objectClass,
256             final Filter filter,
257             final SearchResultsHandler handler,
258             final OperationOptions options) {
259 
260         SearchResult result = new SearchResult();
261 
262         try {
263             for (int record = 1; reader().hasNext(); record++) {
264                 Map<String, String> row = reader().next();
265 
266                 String keyValue = row.get(keyColumn);
267                 if (StringUtils.isBlank(keyValue)) {
268                     keyValue = "Record " + record;
269                 }
270 
271                 ConnectorObjectBuilder builder = new ConnectorObjectBuilder().
272                         setObjectClass(objectClass).
273                         setUid(keyValue).
274                         setName(keyValue);
275 
276                 row.forEach((key, value) -> builder.addAttribute(arrayElementsSeparator == null
277                         ? AttributeBuilder.build(key, value)
278                         : AttributeBuilder.build(key,
279                                 (Object[]) StringUtils.splitByWholeSeparator(value, arrayElementsSeparator))));
280 
281                 ConnectorObject obj = builder.build();
282                 if (filter == null || filter.accept(obj)) {
283                     handler.handle(obj);
284                 } else {
285                     LOG.debug("Found but not passing the provided filter {}: {}", filter, obj);
286                 }
287             }
288         } catch (IOException e) {
289             LOG.error("Could not read CSV from provided stream", e);
290             throw new ConnectorException(e);
291         }
292 
293         return result;
294     }
295 
296     @Override
297     public Set<ObjectClassInfo> getObjectClassInfo() {
298         return Set.of();
299     }
300 
301     @Override
302     public void validate() {
303         // nothing to do
304     }
305 
306     @Override
307     public void test() {
308         // nothing to do
309     }
310 
311     @Override
312     public void dispose() {
313         // nothing to do
314     }
315 }