1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages.primitives.long_id;
20
21 import com.google.common.collect.Lists;
22
23 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
24 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
25 import it.unimi.dsi.fastutil.longs.LongIterator;
26
27 import java.util.List;
28
29 import org.apache.giraph.comm.messages.MessageStore;
30 import org.apache.giraph.comm.messages.PartitionSplitInfo;
31 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32 import org.apache.giraph.factories.MessageValueFactory;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.io.Writable;
35
36
37
38
39
40
41
42
43
44
45 public abstract class LongAbstractStore<M extends Writable, T>
46 implements MessageStore<LongWritable, M> {
47
48 protected final MessageValueFactory<M> messageValueFactory;
49
50 protected final
51 Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
52
53 protected final PartitionSplitInfo<LongWritable> partitionInfo;
54
55 protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
56 config;
57
58
59
60
61
62
63
64
65 public LongAbstractStore(
66 MessageValueFactory<M> messageValueFactory,
67 PartitionSplitInfo<LongWritable> partitionInfo,
68 ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
69 config) {
70 this.messageValueFactory = messageValueFactory;
71 this.partitionInfo = partitionInfo;
72 this.config = config;
73
74 map = new Int2ObjectOpenHashMap<>();
75 for (int partitionId : partitionInfo.getPartitionIds()) {
76 Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
77 (int) partitionInfo.getPartitionVertexCount(partitionId));
78 map.put(partitionId, partitionMap);
79 }
80 }
81
82
83
84
85
86
87
88 protected Long2ObjectOpenHashMap<T> getPartitionMap(
89 LongWritable vertexId) {
90 return map.get(partitionInfo.getPartitionId(vertexId));
91 }
92
93 @Override
94 public void clearPartition(int partitionId) {
95 map.get(partitionId).clear();
96 }
97
98 @Override
99 public boolean hasMessagesForVertex(LongWritable vertexId) {
100 return getPartitionMap(vertexId).containsKey(vertexId.get());
101 }
102
103 @Override
104 public boolean hasMessagesForPartition(int partitionId) {
105 Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
106 return partitionMessages != null && !partitionMessages.isEmpty();
107 }
108
109 @Override
110 public void clearVertexMessages(LongWritable vertexId) {
111 getPartitionMap(vertexId).remove(vertexId.get());
112 }
113
114
115 @Override
116 public void clearAll() {
117 map.clear();
118 }
119
120 @Override
121 public Iterable<LongWritable> getPartitionDestinationVertices(
122 int partitionId) {
123 Long2ObjectOpenHashMap<T> partitionMap =
124 map.get(partitionId);
125 List<LongWritable> vertices =
126 Lists.newArrayListWithCapacity(partitionMap.size());
127 LongIterator iterator = partitionMap.keySet().iterator();
128 while (iterator.hasNext()) {
129 vertices.add(new LongWritable(iterator.nextLong()));
130 }
131 return vertices;
132 }
133 }