1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.aggregators;
20
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentMap;
27
28 import org.apache.giraph.comm.GlobalCommType;
29 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30 import org.apache.giraph.master.MasterInfo;
31 import org.apache.giraph.reducers.ReduceOperation;
32 import org.apache.giraph.reducers.Reducer;
33 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.util.Progressable;
36 import org.apache.log4j.Logger;
37
38 import com.google.common.base.Preconditions;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Maps;
41
42
43
44
45
46
47
48
49
50
51
52 public class AllAggregatorServerData {
53
54 private static final Logger LOG =
55 Logger.getLogger(AllAggregatorServerData.class);
56
57 private final ConcurrentMap<String, Writable>
58 broadcastedMap = Maps.newConcurrentMap();
59
60 private final ConcurrentMap<String, ReduceOperation<Object, Writable>>
61 reduceOpMap = Maps.newConcurrentMap();
62
63
64
65
66
67
68 private final TaskIdsPermitsBarrier masterBarrier;
69
70
71
72
73 private final List<byte[]> masterData =
74 Collections.synchronizedList(Lists.<byte[]>newArrayList());
75
76
77
78
79
80
81 private final TaskIdsPermitsBarrier workersBarrier;
82
83 private final Progressable progressable;
84
85 private final ImmutableClassesGiraphConfiguration conf;
86
87
88
89
90
91
92
93 public AllAggregatorServerData(Progressable progressable,
94 ImmutableClassesGiraphConfiguration conf) {
95 this.progressable = progressable;
96 this.conf = conf;
97 workersBarrier = new TaskIdsPermitsBarrier(progressable);
98 masterBarrier = new TaskIdsPermitsBarrier(progressable);
99 }
100
101
102
103
104
105
106
107 public void receiveValueFromMaster(
108 String name, GlobalCommType type, Writable value) {
109 switch (type) {
110 case BROADCAST:
111 broadcastedMap.put(name, value);
112 break;
113
114 case REDUCE_OPERATIONS:
115 reduceOpMap.put(name, (ReduceOperation<Object, Writable>) value);
116 break;
117
118 default:
119 throw new IllegalArgumentException("Unkown request type " + type);
120 }
121 progressable.progress();
122 }
123
124
125
126
127
128
129
130 public void receivedRequestFromMaster(byte[] data) {
131 masterData.add(data);
132 masterBarrier.releaseOnePermit();
133 }
134
135
136
137
138
139
140
141
142 public void receivedRequestCountFromMaster(long requestCount, int taskId) {
143 masterBarrier.requirePermits(requestCount, taskId);
144 }
145
146
147
148
149
150 public void receivedRequestFromWorker() {
151 workersBarrier.releaseOnePermit();
152 }
153
154
155
156
157
158
159
160
161 public void receivedRequestCountFromWorker(long requestCount, int taskId) {
162 workersBarrier.requirePermits(requestCount, taskId);
163 }
164
165
166
167
168
169
170
171
172 public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) {
173 masterBarrier.waitForRequiredPermits(
174 Collections.singleton(masterInfo.getTaskId()));
175 if (LOG.isDebugEnabled()) {
176 LOG.debug("getDataFromMasterWhenReady: " +
177 "Aggregator data for distribution ready");
178 }
179 return masterData;
180 }
181
182
183
184
185
186
187
188
189
190 public void fillNextSuperstepMapsWhenReady(
191 Set<Integer> workerIds,
192 Map<String, Writable> broadcastedMapToFill,
193 Map<String, Reducer<Object, Writable>> reducerMapToFill) {
194 workersBarrier.waitForRequiredPermits(workerIds);
195 if (LOG.isDebugEnabled()) {
196 LOG.debug("fillNextSuperstepMapsWhenReady: Global data ready");
197 }
198
199 Preconditions.checkArgument(broadcastedMapToFill.isEmpty(),
200 "broadcastedMap needs to be empty for filling");
201 Preconditions.checkArgument(reducerMapToFill.isEmpty(),
202 "reducerMap needs to be empty for filling");
203
204 broadcastedMapToFill.putAll(broadcastedMap);
205
206 for (Entry<String, ReduceOperation<Object, Writable>> entry :
207 reduceOpMap.entrySet()) {
208 reducerMapToFill.put(entry.getKey(), new Reducer<>(entry.getValue()));
209 }
210
211 broadcastedMap.clear();
212 reduceOpMap.clear();
213 masterData.clear();
214 if (LOG.isDebugEnabled()) {
215 LOG.debug("reset: Ready for next superstep");
216 }
217 }
218 }
219