1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.mapping;
20
21 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
22
23 import java.util.Arrays;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import javax.annotation.concurrent.ThreadSafe;
29
30 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
31 import org.apache.giraph.conf.GiraphConstants;
32 import org.apache.hadoop.io.ByteWritable;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.log4j.Logger;
36
37 import com.google.common.collect.MapMaker;
38
39
40
41
42
43
44
45
46
47
48 @ThreadSafe
49 public class LongByteMappingStore
50 extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
51 Writable> implements MappingStore<LongWritable, ByteWritable> {
52
53 private static final Logger LOG = Logger.getLogger(
54 LongByteMappingStore.class);
55
56
57 private final AtomicLong numEntries = new AtomicLong(0);
58
59
60 private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
61
62 private Long2ObjectOpenHashMap<byte[]> idToBytes;
63
64 private int lower;
65
66 private int upper;
67
68 private int lowerBitMask;
69
70 private int lowerOrder;
71
72 @Override
73 public void initialize() {
74 upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
75 lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
76
77 if ((lower & (lower - 1)) != 0) {
78 throw new IllegalStateException("lower not a power of two");
79 }
80
81 lowerBitMask = lower - 1;
82 lowerOrder = Integer.numberOfTrailingZeros(lower);
83 concurrentIdToBytes = new MapMaker()
84 .initialCapacity(upper)
85 .concurrencyLevel(getConf().getNumInputSplitsThreads())
86 .makeMap();
87 idToBytes = new Long2ObjectOpenHashMap<>(upper);
88 }
89
90
91
92
93
94
95
96 public byte getByteTarget(LongWritable vertexId) {
97 long key = vertexId.get() >>> lowerOrder;
98 int suffix = (int) (vertexId.get() & lowerBitMask);
99 if (!idToBytes.containsKey(key)) {
100 return -1;
101 }
102 return idToBytes.get(key)[suffix];
103 }
104
105 @Override
106 public void addEntry(LongWritable vertexId, ByteWritable target) {
107 long key = vertexId.get() >>> lowerOrder;
108 byte[] bytes = concurrentIdToBytes.get(key);
109 if (bytes == null) {
110 byte[] newBytes = new byte[lower];
111 Arrays.fill(newBytes, (byte) -1);
112 bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
113 if (bytes == null) {
114 bytes = newBytes;
115 }
116 }
117 bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
118 numEntries.getAndIncrement();
119 }
120
121 @Override
122 public ByteWritable getTarget(LongWritable vertexId,
123 ByteWritable target) {
124 byte bval = getByteTarget(vertexId);
125 if (bval == -1) {
126 return null;
127 }
128 target.set(bval);
129 return target;
130 }
131
132 @Override
133 public void postFilling() {
134
135 for (Map.Entry<Long, byte[]> entry : concurrentIdToBytes.entrySet()) {
136 idToBytes.put(entry.getKey(), entry.getValue());
137 }
138 concurrentIdToBytes.clear();
139 concurrentIdToBytes = null;
140 }
141
142 @Override
143 public long getStats() {
144 return numEntries.longValue();
145 }
146 }