Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
LongAbstractListStore |
|
| 0.0;0 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.comm.messages.primitives.long_id; | |
20 | ||
21 | import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; | |
22 | import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; | |
23 | ||
24 | import java.util.List; | |
25 | ||
26 | import org.apache.giraph.comm.messages.PartitionSplitInfo; | |
27 | import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; | |
28 | import org.apache.giraph.factories.MessageValueFactory; | |
29 | import org.apache.giraph.graph.Vertex; | |
30 | import org.apache.giraph.partition.Partition; | |
31 | import org.apache.hadoop.io.LongWritable; | |
32 | import org.apache.hadoop.io.Writable; | |
33 | ||
34 | /** | |
35 | * Special message store to be used when ids are LongWritable and no combiner | |
36 | * is used. | |
37 | * Uses fastutil primitive maps in order to decrease number of objects and | |
38 | * get better performance. | |
39 | * | |
40 | * @param <M> message type | |
41 | * @param <L> list type | |
42 | */ | |
43 | 0 | public abstract class LongAbstractListStore<M extends Writable, |
44 | L extends List> extends LongAbstractStore<M, L> { | |
45 | /** | |
46 | * Map used to store messages for nascent vertices i.e., ones | |
47 | * that did not exist at the start of current superstep but will get | |
48 | * created because of sending message to a non-existent vertex id | |
49 | */ | |
50 | private final | |
51 | Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap; | |
52 | ||
53 | /** | |
54 | * Constructor | |
55 | * | |
56 | * @param messageValueFactory Factory for creating message values | |
57 | * @param partitionInfo Partition split info | |
58 | * @param config Hadoop configuration | |
59 | */ | |
60 | public LongAbstractListStore( | |
61 | MessageValueFactory<M> messageValueFactory, | |
62 | PartitionSplitInfo<LongWritable> partitionInfo, | |
63 | ImmutableClassesGiraphConfiguration<LongWritable, | |
64 | Writable, Writable> config) { | |
65 | 0 | super(messageValueFactory, partitionInfo, config); |
66 | 0 | populateMap(); |
67 | ||
68 | // create map for vertex ids (i.e., nascent vertices) not known yet | |
69 | 0 | nascentMap = new Int2ObjectOpenHashMap<>(); |
70 | 0 | for (int partitionId : partitionInfo.getPartitionIds()) { |
71 | 0 | nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>()); |
72 | 0 | } |
73 | 0 | } |
74 | ||
75 | /** | |
76 | * Populate the map with all vertexIds for each partition | |
77 | */ | |
78 | private void populateMap() { // TODO - can parallelize? | |
79 | // populate with vertex ids already known | |
80 | 0 | partitionInfo.startIteration(); |
81 | while (true) { | |
82 | 0 | Partition partition = partitionInfo.getNextPartition(); |
83 | 0 | if (partition == null) { |
84 | 0 | break; |
85 | } | |
86 | 0 | Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId()); |
87 | 0 | for (Object obj : partition) { |
88 | 0 | Vertex vertex = (Vertex) obj; |
89 | 0 | LongWritable vertexId = (LongWritable) vertex.getId(); |
90 | 0 | partitionMap.put(vertexId.get(), createList()); |
91 | 0 | } |
92 | 0 | partitionInfo.putPartition(partition); |
93 | 0 | } |
94 | 0 | } |
95 | ||
96 | /** | |
97 | * Create an instance of L | |
98 | * @return instance of L | |
99 | */ | |
100 | protected abstract L createList(); | |
101 | ||
102 | /** | |
103 | * Get list for the current vertexId | |
104 | * | |
105 | * @param vertexId vertex id | |
106 | * @return list for current vertexId | |
107 | */ | |
108 | protected L getList(LongWritable vertexId) { | |
109 | 0 | long id = vertexId.get(); |
110 | 0 | int partitionId = partitionInfo.getPartitionId(vertexId); |
111 | 0 | Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId); |
112 | 0 | L list = partitionMap.get(id); |
113 | 0 | if (list == null) { |
114 | 0 | Long2ObjectOpenHashMap<L> nascentPartitionMap = |
115 | 0 | nascentMap.get(partitionId); |
116 | // assumption: not many nascent vertices are created | |
117 | // so overall synchronization is negligible | |
118 | 0 | synchronized (nascentPartitionMap) { |
119 | 0 | list = nascentPartitionMap.get(id); |
120 | 0 | if (list == null) { |
121 | 0 | list = createList(); |
122 | 0 | nascentPartitionMap.put(id, list); |
123 | } | |
124 | 0 | return list; |
125 | 0 | } |
126 | } | |
127 | 0 | return list; |
128 | } | |
129 | ||
130 | @Override | |
131 | public void finalizeStore() { | |
132 | 0 | for (int partitionId : nascentMap.keySet()) { |
133 | // nascent vertices are present only in nascent map | |
134 | 0 | map.get(partitionId).putAll(nascentMap.get(partitionId)); |
135 | 0 | } |
136 | 0 | nascentMap.clear(); |
137 | 0 | } |
138 | ||
139 | @Override | |
140 | public boolean hasMessagesForVertex(LongWritable vertexId) { | |
141 | 0 | int partitionId = partitionInfo.getPartitionId(vertexId); |
142 | 0 | Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId); |
143 | 0 | L list = partitionMap.get(vertexId.get()); |
144 | 0 | if (list != null && !list.isEmpty()) { |
145 | 0 | return true; |
146 | } | |
147 | 0 | Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId); |
148 | 0 | return nascentMessages != null && |
149 | 0 | nascentMessages.containsKey(vertexId.get()); |
150 | } | |
151 | ||
152 | // TODO - discussion | |
153 | /* | |
154 | some approaches for ensuring correctness with parallel inserts | |
155 | - current approach: uses a small extra bit of memory by pre-populating | |
156 | map & pushes everything map cannot handle to nascentMap | |
157 | at the beginning of next superstep compute a single threaded finalizeStore is | |
158 | called (so little extra memory + 1 sequential finish ops) | |
159 | - used striped parallel fast utils instead (unsure of perf) | |
160 | - use concurrent map (every get gets far slower) | |
161 | - use reader writer locks (unsure of perf) | |
162 | (code looks something like underneath) | |
163 | ||
164 | private final ReadWriteLock rwl = new ReentrantReadWriteLock(); | |
165 | rwl.readLock().lock(); | |
166 | L list = partitionMap.get(vertexId); | |
167 | if (list == null) { | |
168 | rwl.readLock().unlock(); | |
169 | rwl.writeLock().lock(); | |
170 | if (partitionMap.get(vertexId) == null) { | |
171 | list = createList(); | |
172 | partitionMap.put(vertexId, list); | |
173 | } | |
174 | rwl.readLock().lock(); | |
175 | rwl.writeLock().unlock(); | |
176 | } | |
177 | rwl.readLock().unlock(); | |
178 | - adopted from the article | |
179 | http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\ | |
180 | ReentrantReadWriteLock.html | |
181 | */ | |
182 | } |