Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
AggregatorUtils |
|
| 1.5;1.5 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.comm.aggregators; | |
20 | ||
21 | import java.util.List; | |
22 | ||
23 | import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; | |
24 | import org.apache.giraph.worker.WorkerInfo; | |
25 | ||
26 | /** | |
27 | * Class for aggregator constants and utility methods | |
28 | */ | |
29 | public class AggregatorUtils { | |
30 | ||
31 | /** How big a single aggregator request can be (in bytes) */ | |
32 | public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST = | |
33 | "giraph.maxBytesPerAggregatorRequest"; | |
34 | /** Default max size of single aggregator request (1MB) */ | |
35 | public static final int MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT = | |
36 | 1024 * 1024; | |
37 | /** | |
38 | * Whether or not to have a copy of aggregators for each compute thread. | |
39 | * Unless aggregators are very large and it would hurt the application to | |
40 | * have that many copies of them, user should use thread-local aggregators | |
41 | * to prevent synchronization when aggregate() is called (and get better | |
42 | * performance because of it). | |
43 | */ | |
44 | public static final String USE_THREAD_LOCAL_AGGREGATORS = | |
45 | "giraph.useThreadLocalAggregators"; | |
46 | /** Default is not to have a copy of aggregators for each thread */ | |
47 | public static final boolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false; | |
48 | ||
49 | /** Do not instantiate */ | |
50 | 0 | private AggregatorUtils() { } |
51 | ||
52 | /** | |
53 | * Get owner of aggregator with selected name from the list of workers | |
54 | * | |
55 | * @param aggregatorName Name of the aggregators | |
56 | * @param workers List of workers | |
57 | * @return Worker which owns the aggregator | |
58 | */ | |
59 | public static WorkerInfo getOwner(String aggregatorName, | |
60 | List<WorkerInfo> workers) { | |
61 | 0 | int index = Math.abs(aggregatorName.hashCode() % workers.size()); |
62 | 0 | return workers.get(index); |
63 | } | |
64 | ||
65 | /** | |
66 | * Check if we should use thread local aggregators. | |
67 | * | |
68 | * @param conf Giraph configuration | |
69 | * @return True iff we should use thread local aggregators | |
70 | */ | |
71 | public static boolean | |
72 | useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) { | |
73 | 0 | return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS, |
74 | USE_THREAD_LOCAL_AGGREGATORS_DEFAULT); | |
75 | } | |
76 | ||
77 | /** | |
78 | * Get the warning message about usage of unregistered aggregator to be | |
79 | * printed to user. If user didn't register any aggregators also provide | |
80 | * the explanation on how to do so. | |
81 | * | |
82 | * @param aggregatorName The name of the aggregator which user tried to | |
83 | * access | |
84 | * @param hasRegisteredAggregators True iff user registered some aggregators | |
85 | * @param conf Giraph configuration | |
86 | * @return Warning message | |
87 | */ | |
88 | public static String getUnregisteredAggregatorMessage( | |
89 | String aggregatorName, boolean hasRegisteredAggregators, | |
90 | ImmutableClassesGiraphConfiguration conf) { | |
91 | 0 | String message = "Tried to access aggregator which wasn't registered " + |
92 | aggregatorName; | |
93 | 0 | if (!hasRegisteredAggregators) { |
94 | 0 | message = message + "; Aggregators can be registered in " + |
95 | "MasterCompute.initialize by calling " + | |
96 | "registerAggregator(aggregatorName, aggregatorClass). " + | |
97 | "Also be sure that you are correctly setting MasterCompute class, " + | |
98 | 0 | "currently using " + conf.getMasterComputeClass().getName(); |
99 | } | |
100 | 0 | return message; |
101 | } | |
102 | ||
103 | /** | |
104 | * Get the warning message about usage of unregistered reducer to be | |
105 | * printed to user. If user didn't register any reducers also provide | |
106 | * the explanation on how to do so. | |
107 | * | |
108 | * @param reducerName The name of the aggregator which user tried to | |
109 | * access | |
110 | * @param hasRegisteredReducers True iff user registered some aggregators | |
111 | * @param conf Giraph configuration | |
112 | * @return Warning message | |
113 | */ | |
114 | public static String getUnregisteredReducerMessage( | |
115 | String reducerName, boolean hasRegisteredReducers, | |
116 | ImmutableClassesGiraphConfiguration conf) { | |
117 | 0 | String message = "Tried to access reducer which wasn't registered " + |
118 | reducerName; | |
119 | 0 | if (!hasRegisteredReducers) { |
120 | 0 | message = message + "; Aggregators can be registered from " + |
121 | "MasterCompute by calling registerReducer function. " + | |
122 | "Also be sure that you are correctly setting MasterCompute class, " + | |
123 | 0 | "currently using " + conf.getMasterComputeClass().getName(); |
124 | } | |
125 | 0 | return message; |
126 | } | |
127 | ||
128 | /** | |
129 | * Get the warning message when user tries to access broadcast, without | |
130 | * previously setting it, to be printed to user. | |
131 | * If user didn't broadcast any value also provide | |
132 | * the explanation on how to do so. | |
133 | * | |
134 | * @param broadcastName The name of the broadcast which user tried to | |
135 | * access | |
136 | * @param hasBroadcasted True iff user has broadcasted value before | |
137 | * @param conf Giraph configuration | |
138 | * @return Warning message | |
139 | */ | |
140 | public static String getUnregisteredBroadcastMessage( | |
141 | String broadcastName, boolean hasBroadcasted, | |
142 | ImmutableClassesGiraphConfiguration conf) { | |
143 | 0 | String message = "Tried to access broadcast which wasn't set before " + |
144 | broadcastName; | |
145 | 0 | if (!hasBroadcasted) { |
146 | 0 | message = message + "; Values can be broadcasted from " + |
147 | "MasterCompute by calling broadcast function. " + | |
148 | "Also be sure that you are correctly setting MasterCompute class, " + | |
149 | 0 | "currently using " + conf.getMasterComputeClass().getName(); |
150 | } | |
151 | 0 | return message; |
152 | } | |
153 | } |