1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.pushpull.stream;
20
21 import com.fasterxml.jackson.databind.MappingIterator;
22 import com.fasterxml.jackson.databind.SequenceWriter;
23 import com.fasterxml.jackson.dataformat.csv.CsvMapper;
24 import com.fasterxml.jackson.dataformat.csv.CsvParser;
25 import com.fasterxml.jackson.dataformat.csv.CsvSchema;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.util.ArrayList;
30 import java.util.LinkedHashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.atomic.AtomicReference;
35 import java.util.stream.Collectors;
36 import org.apache.commons.lang3.StringUtils;
37 import org.apache.syncope.common.rest.api.beans.CSVPullSpec;
38 import org.apache.syncope.core.persistence.api.entity.ConnInstance;
39 import org.apache.syncope.core.provisioning.api.Connector;
40 import org.identityconnectors.framework.common.exceptions.ConnectorException;
41 import org.identityconnectors.framework.common.objects.Attribute;
42 import org.identityconnectors.framework.common.objects.AttributeBuilder;
43 import org.identityconnectors.framework.common.objects.AttributeDelta;
44 import org.identityconnectors.framework.common.objects.AttributeUtil;
45 import org.identityconnectors.framework.common.objects.ConnectorObject;
46 import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
47 import org.identityconnectors.framework.common.objects.ObjectClass;
48 import org.identityconnectors.framework.common.objects.ObjectClassInfo;
49 import org.identityconnectors.framework.common.objects.OperationOptions;
50 import org.identityconnectors.framework.common.objects.SearchResult;
51 import org.identityconnectors.framework.common.objects.SyncResultsHandler;
52 import org.identityconnectors.framework.common.objects.SyncToken;
53 import org.identityconnectors.framework.common.objects.Uid;
54 import org.identityconnectors.framework.common.objects.filter.Filter;
55 import org.identityconnectors.framework.spi.SearchResultsHandler;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.springframework.util.CollectionUtils;
59
60 public class CSVStreamConnector implements Connector, AutoCloseable {
61
62 private static final Logger LOG = LoggerFactory.getLogger(CSVStreamConnector.class);
63
64 private final String keyColumn;
65
66 private final String arrayElementsSeparator;
67
68 private final CsvSchema.Builder schemaBuilder;
69
70 private final InputStream in;
71
72 private final OutputStream out;
73
74 private final List<String> columns;
75
76 private MappingIterator<Map<String, String>> reader;
77
78 private SequenceWriter writer;
79
80 public CSVStreamConnector(
81 final String keyColumn,
82 final String arrayElementsSeparator,
83 final CsvSchema.Builder schemaBuilder,
84 final InputStream in,
85 final OutputStream out,
86 final String... columns) {
87
88 this.keyColumn = keyColumn;
89 this.arrayElementsSeparator = arrayElementsSeparator;
90 this.schemaBuilder = schemaBuilder;
91 this.in = in;
92 this.out = out;
93 this.columns = List.of(columns);
94 }
95
96 @Override
97 public void close() throws IOException {
98 if (reader != null) {
99 reader.close();
100 }
101 if (writer != null) {
102 writer.close();
103 }
104 }
105
106 public MappingIterator<Map<String, String>> reader() throws IOException {
107 synchronized (this) {
108 if (reader == null) {
109 reader = new CsvMapper().
110 enable(CsvParser.Feature.SKIP_EMPTY_LINES).
111 readerFor(Map.class).with(schemaBuilder.build()).readValues(in);
112 }
113 }
114 return reader;
115 }
116
117 public List<String> getColumns(final CSVPullSpec spec) throws IOException {
118 List<String> fromSpec = new ArrayList<>();
119 ((CsvSchema) reader().getParserSchema()).forEach(column -> {
120 if (!spec.getIgnoreColumns().contains(column.getName())) {
121 fromSpec.add(column.getName());
122 }
123 });
124 return fromSpec;
125 }
126
127 public SequenceWriter writer() throws IOException {
128 synchronized (this) {
129 if (writer == null) {
130 writer = new CsvMapper().writerFor(Map.class).with(schemaBuilder.build()).writeValues(out);
131 }
132 }
133 return writer;
134 }
135
136 @Override
137 public Uid authenticate(final String username, final String password, final OperationOptions options) {
138 return null;
139 }
140
141 @Override
142 public ConnInstance getConnInstance() {
143 return null;
144 }
145
146 @Override
147 public Uid create(
148 final ObjectClass objectClass,
149 final Set<Attribute> attrs,
150 final OperationOptions options,
151 final AtomicReference<Boolean> propagationAttempted) {
152
153 synchronized (schemaBuilder) {
154 if (schemaBuilder.size() == 0) {
155 attrs.stream().filter(attr -> !AttributeUtil.isSpecial(attr)).map(Attribute::getName).
156 sorted((c1, c2) -> {
157
158 int index1 = columns.indexOf(c1);
159 if (index1 == -1) {
160 index1 = Integer.MAX_VALUE;
161 }
162 int index2 = columns.indexOf(c2);
163 if (index2 == -1) {
164 index2 = Integer.MAX_VALUE;
165 }
166 return Integer.compare(index1, index2);
167 }).
168 forEachOrdered(schemaBuilder::addColumn);
169 }
170 }
171
172 Map<String, String> row = new LinkedHashMap<>();
173 attrs.stream().filter(attr -> !AttributeUtil.isSpecial(attr)).forEach(attr -> {
174 if (CollectionUtils.isEmpty(attr.getValue()) || attr.getValue().get(0) == null) {
175 row.put(attr.getName(), null);
176 } else if (attr.getValue().size() == 1) {
177 row.put(attr.getName(), attr.getValue().get(0).toString());
178 } else if (arrayElementsSeparator == null) {
179 row.put(attr.getName(), attr.getValue().toString());
180 } else {
181 row.put(
182 attr.getName(),
183 attr.getValue().stream().map(Object::toString).
184 collect(Collectors.joining(arrayElementsSeparator)));
185 }
186 });
187 try {
188 writer().write(row);
189 } catch (IOException e) {
190 throw new ConnectorException("Could not write object " + row, e);
191 }
192 propagationAttempted.set(Boolean.TRUE);
193 return null;
194 }
195
196 @Override
197 public Uid update(
198 final ObjectClass objectClass,
199 final Uid uid,
200 final Set<Attribute> attrs,
201 final OperationOptions options,
202 final AtomicReference<Boolean> propagationAttempted) {
203
204 return null;
205 }
206
207 @Override
208 public Set<AttributeDelta> updateDelta(
209 final ObjectClass objectClass,
210 final Uid uid,
211 final Set<AttributeDelta> modifications,
212 final OperationOptions options,
213 final AtomicReference<Boolean> propagationAttempted) {
214
215 return Set.of();
216 }
217
218 @Override
219 public void delete(
220 final ObjectClass objectClass,
221 final Uid uid,
222 final OperationOptions options,
223 final AtomicReference<Boolean> propagationAttempted) {
224
225
226 }
227
228 @Override
229 public void sync(
230 final ObjectClass objectClass,
231 final SyncToken token,
232 final SyncResultsHandler handler,
233 final OperationOptions options) {
234
235 throw new UnsupportedOperationException();
236 }
237
238 @Override
239 public SyncToken getLatestSyncToken(final ObjectClass objectClass) {
240 throw new UnsupportedOperationException();
241 }
242
243 @Override
244 public ConnectorObject getObject(
245 final ObjectClass objectClass,
246 final Attribute connObjectKey,
247 final boolean ignoreCaseMatch,
248 final OperationOptions options) {
249
250 return null;
251 }
252
253 @Override
254 public SearchResult search(
255 final ObjectClass objectClass,
256 final Filter filter,
257 final SearchResultsHandler handler,
258 final OperationOptions options) {
259
260 SearchResult result = new SearchResult();
261
262 try {
263 for (int record = 1; reader().hasNext(); record++) {
264 Map<String, String> row = reader().next();
265
266 String keyValue = row.get(keyColumn);
267 if (StringUtils.isBlank(keyValue)) {
268 keyValue = "Record " + record;
269 }
270
271 ConnectorObjectBuilder builder = new ConnectorObjectBuilder().
272 setObjectClass(objectClass).
273 setUid(keyValue).
274 setName(keyValue);
275
276 row.forEach((key, value) -> builder.addAttribute(arrayElementsSeparator == null
277 ? AttributeBuilder.build(key, value)
278 : AttributeBuilder.build(key,
279 (Object[]) StringUtils.splitByWholeSeparator(value, arrayElementsSeparator))));
280
281 ConnectorObject obj = builder.build();
282 if (filter == null || filter.accept(obj)) {
283 handler.handle(obj);
284 } else {
285 LOG.debug("Found but not passing the provided filter {}: {}", filter, obj);
286 }
287 }
288 } catch (IOException e) {
289 LOG.error("Could not read CSV from provided stream", e);
290 throw new ConnectorException(e);
291 }
292
293 return result;
294 }
295
296 @Override
297 public Set<ObjectClassInfo> getObjectClassInfo() {
298 return Set.of();
299 }
300
301 @Override
302 public void validate() {
303
304 }
305
306 @Override
307 public void test() {
308
309 }
310
311 @Override
312 public void dispose() {
313
314 }
315 }