1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.utils; |
20 | |
|
21 | |
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; |
22 | |
|
23 | |
import java.util.concurrent.atomic.AtomicInteger; |
24 | |
|
25 | |
import org.apache.giraph.conf.FloatConfOption; |
26 | |
import org.apache.giraph.conf.GiraphConstants; |
27 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
28 | |
import org.apache.giraph.conf.IntConfOption; |
29 | |
|
30 | |
|
31 | |
|
32 | |
|
33 | |
|
34 | 0 | public class ExtendedByteArrayOutputBuffer { |
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | 0 | public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER = |
41 | |
new IntConfOption("giraph.capacityOfDataOutInBuffer", |
42 | |
1024 * GiraphConstants.ONE_KB, |
43 | |
"Set the capacity of dataoutputs in dataout buffer"); |
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | 0 | public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER = |
52 | |
new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f, |
53 | |
"Set the maximum fraction of dataoutput capacity allowed to fill"); |
54 | |
|
55 | |
|
56 | |
private final int maxBufSize; |
57 | |
|
58 | |
private final int threshold; |
59 | |
|
60 | |
private final ImmutableClassesGiraphConfiguration<?, ? , ?> config; |
61 | |
|
62 | |
|
63 | 0 | private final Int2ObjectOpenHashMap<ExtendedDataOutput> |
64 | |
bytearrayOutputs = new Int2ObjectOpenHashMap<>(); |
65 | |
|
66 | 0 | private final AtomicInteger mapSize = new AtomicInteger(0); |
67 | |
|
68 | 0 | private final ThreadLocal<IndexAndDataOut> threadLocal = |
69 | 0 | new ThreadLocal<IndexAndDataOut>() { |
70 | |
@Override |
71 | |
protected IndexAndDataOut initialValue() { |
72 | 0 | return newIndexAndDataOutput(); |
73 | |
} |
74 | |
}; |
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
public ExtendedByteArrayOutputBuffer( |
82 | 0 | ImmutableClassesGiraphConfiguration<?, ?, ?> config) { |
83 | 0 | this.config = config; |
84 | |
|
85 | 0 | maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config); |
86 | 0 | threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) * |
87 | |
maxBufSize); |
88 | 0 | } |
89 | |
|
90 | |
|
91 | |
|
92 | |
|
93 | |
|
94 | |
|
95 | |
public IndexAndDataOut getIndexAndDataOut() { |
96 | 0 | IndexAndDataOut indexAndDataOut = threadLocal.get(); |
97 | 0 | if (indexAndDataOut.dataOutput.getPos() >= threshold) { |
98 | 0 | indexAndDataOut = newIndexAndDataOutput(); |
99 | 0 | threadLocal.set(indexAndDataOut); |
100 | |
} |
101 | 0 | return indexAndDataOut; |
102 | |
} |
103 | |
|
104 | |
|
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
public ExtendedDataOutput getDataOutput(int index) { |
111 | 0 | return bytearrayOutputs.get(index); |
112 | |
} |
113 | |
|
114 | |
|
115 | |
|
116 | |
|
117 | 0 | public static class IndexAndDataOut { |
118 | |
|
119 | |
private final int index; |
120 | |
|
121 | |
private final ExtendedDataOutput dataOutput; |
122 | |
|
123 | |
|
124 | |
|
125 | |
|
126 | |
|
127 | |
|
128 | |
|
129 | 0 | public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) { |
130 | 0 | this.index = index; |
131 | 0 | this.dataOutput = dataOutput; |
132 | 0 | } |
133 | |
|
134 | |
public int getIndex() { |
135 | 0 | return index; |
136 | |
} |
137 | |
|
138 | |
public ExtendedDataOutput getDataOutput() { |
139 | 0 | return dataOutput; |
140 | |
} |
141 | |
} |
142 | |
|
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | |
private IndexAndDataOut newIndexAndDataOutput() { |
148 | 0 | int index = mapSize.getAndIncrement(); |
149 | 0 | ExtendedDataOutput output = config.createExtendedDataOutput( |
150 | |
maxBufSize); |
151 | 0 | synchronized (bytearrayOutputs) { |
152 | 0 | bytearrayOutputs.put(index, output); |
153 | 0 | } |
154 | 0 | return new IndexAndDataOut(index, output); |
155 | |
} |
156 | |
} |