Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
SendVertexIdDataCache |
|
| 1.2;1.2 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.comm; | |
20 | ||
21 | import org.apache.giraph.bsp.CentralizedServiceWorker; | |
22 | import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; | |
23 | import org.apache.giraph.utils.VertexIdData; | |
24 | import org.apache.giraph.worker.WorkerInfo; | |
25 | import org.apache.hadoop.io.WritableComparable; | |
26 | ||
27 | import javax.annotation.concurrent.NotThreadSafe; | |
28 | ||
29 | /** | |
30 | * An abstract structure for caching data indexed by vertex id, | |
31 | * to be sent to workers in bulk. Not thread-safe. | |
32 | * | |
33 | * @param <I> Vertex id | |
34 | * @param <T> Data | |
35 | * @param <B> Specialization of {@link VertexIdData} for T | |
36 | */ | |
37 | @NotThreadSafe | |
38 | @SuppressWarnings("unchecked") | |
39 | public abstract class SendVertexIdDataCache<I extends WritableComparable, T, | |
40 | B extends VertexIdData<I, T>> extends SendDataCache<B> { | |
41 | /** | |
42 | * Constructor. | |
43 | * | |
44 | * @param conf Giraph configuration | |
45 | * @param serviceWorker Service worker | |
46 | * @param maxRequestSize Maximum request size (in bytes) | |
47 | * @param additionalRequestSize Additional request size (expressed as a | |
48 | * ratio of the average request size) | |
49 | */ | |
50 | public SendVertexIdDataCache(ImmutableClassesGiraphConfiguration conf, | |
51 | CentralizedServiceWorker<?, ?, ?> serviceWorker, | |
52 | int maxRequestSize, | |
53 | float additionalRequestSize) { | |
54 | 0 | super(conf, serviceWorker, maxRequestSize, additionalRequestSize); |
55 | 0 | } |
56 | ||
57 | /** | |
58 | * Create a new {@link VertexIdData} specialized for the use case. | |
59 | * | |
60 | * @return A new instance of {@link VertexIdData} | |
61 | */ | |
62 | public abstract B createVertexIdData(); | |
63 | ||
64 | /** | |
65 | * Add data to the cache. | |
66 | * | |
67 | * @param workerInfo the remote worker destination | |
68 | * @param partitionId the remote Partition this message belongs to | |
69 | * @param destVertexId vertex id that is ultimate destination | |
70 | * @param data Data to send to remote worker | |
71 | * @return Size of messages for the worker. | |
72 | */ | |
73 | public int addData(WorkerInfo workerInfo, | |
74 | int partitionId, I destVertexId, T data) { | |
75 | // Get the data collection | |
76 | 0 | VertexIdData<I, T> partitionData = |
77 | 0 | getPartitionData(workerInfo, partitionId); |
78 | 0 | int originalSize = partitionData.getSize(); |
79 | 0 | partitionData.add(destVertexId, data); |
80 | // Update the size of cached, outgoing data per worker | |
81 | 0 | return incrDataSize(workerInfo.getTaskId(), |
82 | 0 | partitionData.getSize() - originalSize); |
83 | } | |
84 | ||
85 | /** | |
86 | * This method is similar to the method above, | |
87 | * but use a serialized id to replace original I type | |
88 | * destVertexId. | |
89 | * | |
90 | * @param workerInfo The remote worker destination | |
91 | * @param partitionId The remote Partition this message belongs to | |
92 | * @param serializedId The byte array to store the serialized target vertex id | |
93 | * @param idPos The length of bytes of serialized id in the byte array above | |
94 | * @param data Data to send to remote worker | |
95 | * @return The number of bytes added to the target worker | |
96 | */ | |
97 | public int addData(WorkerInfo workerInfo, int partitionId, | |
98 | byte[] serializedId, int idPos, T data) { | |
99 | // Get the data collection | |
100 | 0 | VertexIdData<I, T> partitionData = |
101 | 0 | getPartitionData(workerInfo, partitionId); |
102 | 0 | int originalSize = partitionData.getSize(); |
103 | 0 | partitionData.add(serializedId, idPos, data); |
104 | // Update the size of cached, outgoing data per worker | |
105 | 0 | return incrDataSize(workerInfo.getTaskId(), |
106 | 0 | partitionData.getSize() - originalSize); |
107 | } | |
108 | ||
109 | /** | |
110 | * This method tries to get a partition data from the data cache. | |
111 | * If null, it will create one. | |
112 | * | |
113 | * @param workerInfo The remote worker destination | |
114 | * @param partitionId The remote Partition this message belongs to | |
115 | * @return The partition data in data cache | |
116 | */ | |
117 | private VertexIdData<I, T> getPartitionData(WorkerInfo workerInfo, | |
118 | int partitionId) { | |
119 | // Get the data collection | |
120 | 0 | B partitionData = getData(partitionId); |
121 | 0 | if (partitionData == null) { |
122 | 0 | partitionData = createVertexIdData(); |
123 | 0 | partitionData.setConf(getConf()); |
124 | 0 | partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId())); |
125 | 0 | setData(partitionId, partitionData); |
126 | } | |
127 | ||
128 | 0 | return partitionData; |
129 | } | |
130 | } |