Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
GiraphTransferRegulator |
|
| 1.5;1.5 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.graph; | |
19 | ||
20 | import org.apache.hadoop.conf.Configuration; | |
21 | import org.apache.hadoop.io.Writable; | |
22 | import org.apache.hadoop.io.WritableComparable; | |
23 | import org.apache.giraph.partition.PartitionOwner; | |
24 | import com.google.common.collect.Maps; | |
25 | import java.util.Map; | |
26 | ||
27 | ||
28 | /** Utility class to manage data transfers from | |
29 | * a local worker reading InputSplits. | |
30 | * Currently, this class measures # of vertices and edges | |
31 | * per outgoing Collection of graph data (destined for a | |
32 | * particular Partition and remote worker node, preselected | |
33 | * by the master.) | |
34 | * | |
35 | * TODO: implement defaults and configurable options for | |
36 | * measuring the size of input <V> or <E> data | |
37 | * per read vertex, and setting limits on totals per outgoing | |
38 | * graph data Collection etc. (See GIRAPH-260) | |
39 | */ | |
40 | public class GiraphTransferRegulator { | |
41 | /** Maximum vertices to read from an InputSplit locally that are | |
42 | * to be routed to a remote worker, before sending them. */ | |
43 | public static final String MAX_VERTICES_PER_TRANSFER = | |
44 | "giraph.maxVerticesPerTransfer"; | |
45 | /** Default maximum number of vertices per | |
46 | * temp partition before sending. */ | |
47 | public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000; | |
48 | /** | |
49 | * Maximum edges to read from an InputSplit locally that are | |
50 | * to be routed to a remote worker, before sending them. | |
51 | */ | |
52 | public static final String MAX_EDGES_PER_TRANSFER = | |
53 | "giraph.maxEdgesPerTransfer"; | |
54 | /** Default maximum number of vertices per | |
55 | * temp partition before sending. */ | |
56 | public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000; | |
57 | ||
58 | /** Internal state to measure when | |
59 | * the next data transfer of a Collection | |
60 | * of vertices read by the local worker that | |
61 | * owns this regulator is ready to be sent | |
62 | * to the remote worker node that the master | |
63 | * has assigned the vertices to */ | |
64 | private Map<Integer, Integer> edgeAccumulator; | |
65 | ||
66 | /** Internal state to measure when | |
67 | * the next data transfer of a Collection | |
68 | * of vertices read by the local worker that | |
69 | * owns this regulator is ready to be sent | |
70 | * to the remote worker node that the master | |
71 | * has assigned the vertices to */ | |
72 | private Map<Integer, Integer> vertexAccumulator; | |
73 | ||
74 | /** Number of vertices per data transfer */ | |
75 | private final int maxVerticesPerTransfer; | |
76 | ||
77 | /** Number of edges per data transfer */ | |
78 | private final int maxEdgesPerTransfer; | |
79 | ||
80 | /** Vertex count total for this InputSplit */ | |
81 | private long totalVertexCount; | |
82 | ||
83 | /** Edge count total for this InputSplit */ | |
84 | private long totalEdgeCount; | |
85 | ||
86 | /** Default constructor | |
87 | * @param conf the Configuration for this job | |
88 | */ | |
89 | 0 | public GiraphTransferRegulator(Configuration conf) { |
90 | 0 | vertexAccumulator = Maps.<Integer, Integer>newHashMap(); |
91 | 0 | edgeAccumulator = Maps.<Integer, Integer>newHashMap(); |
92 | 0 | maxVerticesPerTransfer = conf.getInt( |
93 | MAX_VERTICES_PER_TRANSFER, | |
94 | MAX_VERTICES_PER_TRANSFER_DEFAULT); | |
95 | 0 | maxEdgesPerTransfer = conf.getInt( |
96 | MAX_EDGES_PER_TRANSFER, | |
97 | MAX_EDGES_PER_TRANSFER_DEFAULT); | |
98 | 0 | totalEdgeCount = 0; |
99 | 0 | totalVertexCount = 0; |
100 | 0 | } |
101 | ||
102 | /** Is this outbound data Collection full, | |
103 | * and ready to transfer? | |
104 | * @param owner the partition owner for the outbound data | |
105 | * @return 'true' if the temp partition data is ready to transfer | |
106 | */ | |
107 | public boolean transferThisPartition(PartitionOwner owner) { | |
108 | 0 | final int partitionId = owner.getPartitionId(); |
109 | 0 | if (getEdgesForPartition(partitionId) >= |
110 | maxEdgesPerTransfer || | |
111 | 0 | getVerticesForPartition(partitionId) >= |
112 | maxVerticesPerTransfer) { | |
113 | 0 | vertexAccumulator.put(partitionId, 0); |
114 | 0 | edgeAccumulator.put(partitionId, 0); |
115 | 0 | return true; |
116 | } | |
117 | 0 | return false; |
118 | } | |
119 | ||
120 | /** get current vertex count for a given Collection of | |
121 | * data soon to be transfered to its permanent home. | |
122 | * @param partId the partition id to check the count on. | |
123 | * @return the count of vertices. | |
124 | */ | |
125 | private int getVerticesForPartition(final int partId) { | |
126 | 0 | return vertexAccumulator.get(partId) == null ? |
127 | 0 | 0 : vertexAccumulator.get(partId); |
128 | } | |
129 | ||
130 | /** get current edge count for a given Collection of | |
131 | * data soon to be transfered to its permanent home. | |
132 | * @param partId the partition id to check the count on. | |
133 | * @return the count of edges. | |
134 | */ | |
135 | private int getEdgesForPartition(final int partId) { | |
136 | 0 | return edgeAccumulator.get(partId) == null ? |
137 | 0 | 0 : edgeAccumulator.get(partId); |
138 | } | |
139 | ||
140 | /** Clear storage to reset for reading new InputSplit */ | |
141 | public void clearCounters() { | |
142 | 0 | totalEdgeCount = 0; |
143 | 0 | totalVertexCount = 0; |
144 | 0 | vertexAccumulator.clear(); |
145 | 0 | edgeAccumulator.clear(); |
146 | 0 | } |
147 | ||
148 | /** Increment V & E counts for new vertex read, store values | |
149 | * for that outgoing _temporary_ Partition, which shares the | |
150 | * Partition ID for the actual remote Partition the collection | |
151 | * will eventually be processed in. | |
152 | * @param partitionOwner the owner of the Partition this data | |
153 | * will eventually belong to. | |
154 | * @param vertex the vertex to extract counts from. | |
155 | * @param <I> the vertex id type. | |
156 | * @param <V> the vertex value type. | |
157 | * @param <E> the edge value type. | |
158 | */ | |
159 | public <I extends WritableComparable, V extends Writable, | |
160 | E extends Writable> void | |
161 | incrementCounters(PartitionOwner partitionOwner, | |
162 | Vertex<I, V, E> vertex) { | |
163 | 0 | final int id = partitionOwner.getPartitionId(); |
164 | // vertex counts | |
165 | 0 | vertexAccumulator |
166 | 0 | .put(id, getVerticesForPartition(id) + 1); |
167 | 0 | totalVertexCount++; |
168 | // edge counts | |
169 | 0 | totalEdgeCount += vertex.getNumEdges(); |
170 | 0 | edgeAccumulator.put(id, getEdgesForPartition(id) + |
171 | 0 | vertex.getNumEdges()); |
172 | 0 | } |
173 | ||
174 | /** Getter for MAX edge count to initiate a transfer | |
175 | * @return max edge count per transfer */ | |
176 | public long getMaxEdgesPerTransfer() { | |
177 | 0 | return maxEdgesPerTransfer; |
178 | } | |
179 | ||
180 | /** Getter for MAX vertex count to initiate a transfer | |
181 | * @return max edge count per transfer */ | |
182 | public long getMaxVerticesPerTransfer() { | |
183 | 0 | return maxVerticesPerTransfer; |
184 | } | |
185 | ||
186 | /** Getter for total edge count for the current InputSplit | |
187 | * @return the # of total edges counted in this InputSplit */ | |
188 | public long getTotalEdges() { | |
189 | 0 | return totalEdgeCount; |
190 | } | |
191 | ||
192 | /** Getter for total vetex count for the current InputSplit | |
193 | * @return the total # of vertices counted in this InputSplit */ | |
194 | public long getTotalVertices() { | |
195 | 0 | return totalVertexCount; |
196 | } | |
197 | } | |
198 |