Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
TaskIdsPermitsBarrier |
|
| 2.4;2.4 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.utils; | |
20 | ||
21 | import org.apache.hadoop.util.Progressable; | |
22 | import org.apache.log4j.Logger; | |
23 | ||
24 | import com.google.common.collect.Sets; | |
25 | ||
26 | import java.util.HashSet; | |
27 | import java.util.Set; | |
28 | ||
29 | /** | |
30 | * This barrier is used when we don't know how many events are we waiting on | |
31 | * from the start. Instead we have a set of task ids, and each of those will, | |
32 | * at some point of time, give the information about how many events from it | |
33 | * should we expect. Barrier will be waiting for all the tasks to notify it | |
34 | * about that number of events, and than it will also wait for all the events | |
35 | * to happen. | |
36 | * | |
37 | * requirePermits() corresponds to task notifying us how many events from it | |
38 | * to expect, and releasePermits() notifies us about events happening. | |
39 | * | |
40 | * This class is currently used during preparation of aggregators. | |
41 | * | |
42 | * User must follow this protocol for concurrent access: | |
43 | * | |
44 | * (1) an object instance is constructed | |
45 | * (2) arbitrarily many times | |
46 | * (2a) concurrent calls to requirePermits(), releasePermits() and | |
47 | * waitForRequiredPermits() are issued | |
48 | * (2b) waitForRequiredPermits() returns | |
49 | * | |
50 | * Note that the next cycle of calls to requirePermits() or releasePermits() | |
51 | * cannot start until the previous call to waitForRequiredPermits() | |
52 | * has returned. | |
53 | * | |
54 | * Methods of this class are thread-safe. | |
55 | */ | |
56 | public class TaskIdsPermitsBarrier { | |
57 | /** Class logger */ | |
58 | 0 | private static final Logger LOG = |
59 | 0 | Logger.getLogger(TaskIdsPermitsBarrier.class); |
60 | /** Msecs to refresh the progress meter */ | |
61 | private static final int MSEC_PERIOD = 10000; | |
62 | /** Maximum number of task ids to list in the log */ | |
63 | private static final int MAX_TASK_IDS_TO_LOG = 10; | |
64 | /** Progressable for reporting progress */ | |
65 | private final Progressable progressable; | |
66 | /** Number of permits we are currently waiting for */ | |
67 | 0 | private long waitingOnPermits = 0; |
68 | /** Set of task ids which required permits already */ | |
69 | 0 | private final Set<Integer> arrivedTaskIds = new HashSet<Integer>(); |
70 | /** Logger */ | |
71 | private final TimedLogger logger; | |
72 | ||
73 | /** | |
74 | * Constructor | |
75 | * | |
76 | * @param progressable Progressable for reporting progress | |
77 | */ | |
78 | 0 | public TaskIdsPermitsBarrier(Progressable progressable) { |
79 | 0 | this.progressable = progressable; |
80 | 0 | logger = new TimedLogger(MSEC_PERIOD, LOG); |
81 | 0 | } |
82 | ||
83 | /** | |
84 | * Wait until permits have been required desired number of times, | |
85 | * and all required permits are available | |
86 | * | |
87 | * @param expectedTaskIds List of task ids which we are waiting permits from | |
88 | */ | |
89 | public synchronized void waitForRequiredPermits( | |
90 | Set<Integer> expectedTaskIds) { | |
91 | 0 | while (arrivedTaskIds.size() < expectedTaskIds.size() || |
92 | waitingOnPermits > 0) { | |
93 | try { | |
94 | 0 | wait(MSEC_PERIOD); |
95 | 0 | } catch (InterruptedException e) { |
96 | 0 | throw new IllegalStateException("waitForRequiredPermits: " + |
97 | "InterruptedException occurred"); | |
98 | 0 | } |
99 | 0 | progressable.progress(); |
100 | 0 | if (LOG.isInfoEnabled()) { |
101 | 0 | if (arrivedTaskIds.size() < expectedTaskIds.size()) { |
102 | 0 | String logSuffix = ""; |
103 | 0 | if (expectedTaskIds.size() - arrivedTaskIds.size() <= |
104 | MAX_TASK_IDS_TO_LOG) { | |
105 | 0 | Sets.SetView<Integer> difference = |
106 | 0 | Sets.difference(expectedTaskIds, arrivedTaskIds); |
107 | 0 | logSuffix = ", task ids: " + difference; |
108 | } | |
109 | 0 | logger.info("waitForRequiredPermits: " + |
110 | "Waiting for " + | |
111 | 0 | (expectedTaskIds.size() - arrivedTaskIds.size()) + |
112 | " more tasks to send their aggregator data" + | |
113 | logSuffix); | |
114 | 0 | } else { |
115 | 0 | logger.info("waitForRequiredPermits: " + |
116 | "Waiting for " + waitingOnPermits + " more aggregator requests"); | |
117 | } | |
118 | } | |
119 | } | |
120 | ||
121 | // Reset for the next time to use | |
122 | 0 | arrivedTaskIds.clear(); |
123 | 0 | waitingOnPermits = 0; |
124 | 0 | } |
125 | ||
126 | /** | |
127 | * Require more permits. This will increase the number of times permits | |
128 | * were required. Doesn't wait for permits to become available. | |
129 | * | |
130 | * @param permits Number of permits to require | |
131 | * @param taskId Task id which required permits | |
132 | */ | |
133 | public synchronized void requirePermits(long permits, int taskId) { | |
134 | 0 | arrivedTaskIds.add(taskId); |
135 | 0 | waitingOnPermits += permits; |
136 | 0 | notifyAll(); |
137 | 0 | } |
138 | ||
139 | /** | |
140 | * Release one permit. | |
141 | */ | |
142 | public synchronized void releaseOnePermit() { | |
143 | 0 | releasePermits(1); |
144 | 0 | } |
145 | ||
146 | /** | |
147 | * Release some permits. | |
148 | * | |
149 | * @param permits Number of permits to release | |
150 | */ | |
151 | public synchronized void releasePermits(long permits) { | |
152 | 0 | waitingOnPermits -= permits; |
153 | 0 | notifyAll(); |
154 | 0 | } |
155 | } |