1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.block;
19
20 import java.util.Iterator;
21
22 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
23 import org.apache.giraph.function.Consumer;
24 import org.apache.giraph.function.Supplier;
25
26 import com.google.common.collect.AbstractIterator;
27 import com.google.common.collect.Iterators;
28
29
30
31
32
33
34
35
36 @SuppressWarnings("rawtypes")
37 public final class RepeatUntilBlock implements Block {
38 private final Block block;
39 private final int repeatTimes;
40 private final Supplier<Boolean> toQuit;
41
42 public RepeatUntilBlock(
43 int repeatTimes, Block block, Supplier<Boolean> toQuit) {
44 this.block = block;
45 this.repeatTimes = repeatTimes;
46 this.toQuit = toQuit;
47 }
48
49
50
51
52 public static Block unlimited(Block block, Supplier<Boolean> toQuit) {
53 return new RepeatUntilBlock(Integer.MAX_VALUE, block, toQuit);
54 }
55
56 @Override
57 public Iterator<AbstractPiece> iterator() {
58 return Iterators.concat(new AbstractIterator<Iterator<AbstractPiece>>() {
59 private int index = 0;
60
61 @Override
62 protected Iterator<AbstractPiece> computeNext() {
63 if (index >= repeatTimes || Boolean.TRUE.equals(toQuit.get())) {
64 return endOfData();
65 }
66 index++;
67 return block.iterator();
68 }
69 });
70 }
71
72 @Override
73 public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
74 block.forAllPossiblePieces(consumer);
75 }
76
77 @Override
78 public PieceCount getPieceCount() {
79 return PieceCount.createUnknownCount();
80 }
81
82 @Override
83 public String toString() {
84 return "RepeatUntilBlock(" + repeatTimes + " * " + block + ")";
85 }
86 }