1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.apache.giraph.conf.FloatConfOption;
26 import org.apache.giraph.conf.GiraphConstants;
27 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28 import org.apache.giraph.conf.IntConfOption;
29
30
31
32
33
34 public class ExtendedByteArrayOutputBuffer {
35
36
37
38
39
40 public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER =
41 new IntConfOption("giraph.capacityOfDataOutInBuffer",
42 1024 * GiraphConstants.ONE_KB,
43 "Set the capacity of dataoutputs in dataout buffer");
44
45
46
47
48
49
50
51 public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER =
52 new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f,
53 "Set the maximum fraction of dataoutput capacity allowed to fill");
54
55
56 private final int maxBufSize;
57
58 private final int threshold;
59
60 private final ImmutableClassesGiraphConfiguration<?, ? , ?> config;
61
62
63 private final Int2ObjectOpenHashMap<ExtendedDataOutput>
64 bytearrayOutputs = new Int2ObjectOpenHashMap<>();
65
66 private final AtomicInteger mapSize = new AtomicInteger(0);
67
68 private final ThreadLocal<IndexAndDataOut> threadLocal =
69 new ThreadLocal<IndexAndDataOut>() {
70 @Override
71 protected IndexAndDataOut initialValue() {
72 return newIndexAndDataOutput();
73 }
74 };
75
76
77
78
79
80
81 public ExtendedByteArrayOutputBuffer(
82 ImmutableClassesGiraphConfiguration<?, ?, ?> config) {
83 this.config = config;
84
85 maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config);
86 threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) *
87 maxBufSize);
88 }
89
90
91
92
93
94
95 public IndexAndDataOut getIndexAndDataOut() {
96 IndexAndDataOut indexAndDataOut = threadLocal.get();
97 if (indexAndDataOut.dataOutput.getPos() >= threshold) {
98 indexAndDataOut = newIndexAndDataOutput();
99 threadLocal.set(indexAndDataOut);
100 }
101 return indexAndDataOut;
102 }
103
104
105
106
107
108
109
110 public ExtendedDataOutput getDataOutput(int index) {
111 return bytearrayOutputs.get(index);
112 }
113
114
115
116
117 public static class IndexAndDataOut {
118
119 private final int index;
120
121 private final ExtendedDataOutput dataOutput;
122
123
124
125
126
127
128
129 public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) {
130 this.index = index;
131 this.dataOutput = dataOutput;
132 }
133
134 public int getIndex() {
135 return index;
136 }
137
138 public ExtendedDataOutput getDataOutput() {
139 return dataOutput;
140 }
141 }
142
143
144
145
146
147 private IndexAndDataOut newIndexAndDataOutput() {
148 int index = mapSize.getAndIncrement();
149 ExtendedDataOutput output = config.createExtendedDataOutput(
150 maxBufSize);
151 synchronized (bytearrayOutputs) {
152 bytearrayOutputs.put(index, output);
153 }
154 return new IndexAndDataOut(index, output);
155 }
156 }