1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.aggregators;
20
21 import java.util.List;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.worker.WorkerInfo;
25
26
27
28
29 public class AggregatorUtils {
30
31
32 public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =
33 "giraph.maxBytesPerAggregatorRequest";
34
35 public static final int MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT =
36 1024 * 1024;
37
38
39
40
41
42
43
44 public static final String USE_THREAD_LOCAL_AGGREGATORS =
45 "giraph.useThreadLocalAggregators";
46
47 public static final boolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false;
48
49
50 private AggregatorUtils() { }
51
52
53
54
55
56
57
58
59 public static WorkerInfo getOwner(String aggregatorName,
60 List<WorkerInfo> workers) {
61 int index = Math.abs(aggregatorName.hashCode() % workers.size());
62 return workers.get(index);
63 }
64
65
66
67
68
69
70
71 public static boolean
72 useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) {
73 return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS,
74 USE_THREAD_LOCAL_AGGREGATORS_DEFAULT);
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88 public static String getUnregisteredAggregatorMessage(
89 String aggregatorName, boolean hasRegisteredAggregators,
90 ImmutableClassesGiraphConfiguration conf) {
91 String message = "Tried to access aggregator which wasn't registered " +
92 aggregatorName;
93 if (!hasRegisteredAggregators) {
94 message = message + "; Aggregators can be registered in " +
95 "MasterCompute.initialize by calling " +
96 "registerAggregator(aggregatorName, aggregatorClass). " +
97 "Also be sure that you are correctly setting MasterCompute class, " +
98 "currently using " + conf.getMasterComputeClass().getName();
99 }
100 return message;
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114 public static String getUnregisteredReducerMessage(
115 String reducerName, boolean hasRegisteredReducers,
116 ImmutableClassesGiraphConfiguration conf) {
117 String message = "Tried to access reducer which wasn't registered " +
118 reducerName;
119 if (!hasRegisteredReducers) {
120 message = message + "; Aggregators can be registered from " +
121 "MasterCompute by calling registerReducer function. " +
122 "Also be sure that you are correctly setting MasterCompute class, " +
123 "currently using " + conf.getMasterComputeClass().getName();
124 }
125 return message;
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public static String getUnregisteredBroadcastMessage(
141 String broadcastName, boolean hasBroadcasted,
142 ImmutableClassesGiraphConfiguration conf) {
143 String message = "Tried to access broadcast which wasn't set before " +
144 broadcastName;
145 if (!hasBroadcasted) {
146 message = message + "; Values can be broadcasted from " +
147 "MasterCompute by calling broadcast function. " +
148 "Also be sure that you are correctly setting MasterCompute class, " +
149 "currently using " + conf.getMasterComputeClass().getName();
150 }
151 return message;
152 }
153 }