1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentMap;
24
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.factories.MessageValueFactory;
27 import org.apache.giraph.utils.VertexIdIterator;
28 import org.apache.giraph.utils.WritableUtils;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.io.WritableComparable;
31
32
33
34
35
36
37
38
39
40
41 public abstract class AbstractListPerVertexStore<I extends WritableComparable,
42 M extends Writable, L extends List> extends SimpleMessageStore<I, M, L> {
43
44
45
46
47
48
49
50
51 public AbstractListPerVertexStore(
52 MessageValueFactory<M> messageValueFactory,
53 PartitionSplitInfo<I> partitionInfo,
54 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
55 super(messageValueFactory, partitionInfo, config);
56 }
57
58
59
60
61
62 protected abstract L createList();
63
64
65
66
67
68
69
70
71
72 protected L getOrCreateList(VertexIdIterator<I> iterator) {
73 int partitionId = getPartitionId(iterator.getCurrentVertexId());
74 ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
75 L list = partitionMap.get(iterator.getCurrentVertexId());
76 if (list == null) {
77 L newList = createList();
78 list = partitionMap.putIfAbsent(
79 iterator.releaseCurrentVertexId(), newList);
80 if (list == null) {
81 list = newList;
82 }
83 }
84 return list;
85 }
86
87
88
89
90
91
92
93
94
95
96
97 protected L getOrCreateList(I vertexId) {
98 int partitionId = getPartitionId(vertexId);
99 ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
100 L list = partitionMap.get(vertexId);
101 if (list == null) {
102 L newList = createList();
103 I copyId = WritableUtils.createCopy(vertexId);
104 list = partitionMap.putIfAbsent(copyId, newList);
105 if (list == null) {
106 list = newList;
107 }
108 }
109 return list;
110 }
111
112 @Override
113 public Iterable<M> getVertexMessages(I vertexId) {
114 ConcurrentMap<I, L> partitionMap =
115 map.get(getPartitionId(vertexId));
116 if (partitionMap == null) {
117 return Collections.<M>emptyList();
118 }
119 L list = partitionMap.get(vertexId);
120 return list == null ? Collections.<M>emptyList() :
121 getMessagesAsIterable(list);
122 }
123 }
124