1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.block;
19
20 import java.util.Iterator;
21
22 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
23 import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
24 import org.apache.giraph.function.Consumer;
25 import org.apache.giraph.function.vertex.SupplierFromVertex;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29 import com.google.common.base.Function;
30 import com.google.common.collect.Iterators;
31
32
33
34
35
36
37
38
39
40
41
42 @SuppressWarnings({ "rawtypes", "unchecked" })
43 public final class FilteringBlock<I extends WritableComparable,
44 V extends Writable, E extends Writable>
45 implements Block {
46 private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
47 private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
48 private final Block block;
49
50
51
52
53
54
55 public FilteringBlock(
56 SupplierFromVertex<I, V, E, Boolean> toCallSend,
57 SupplierFromVertex<I, V, E, Boolean> toCallReceive,
58 Block block) {
59 this.toCallSend = toCallSend;
60 this.toCallReceive = toCallReceive;
61 this.block = block;
62 }
63
64
65
66
67
68 public FilteringBlock(
69 SupplierFromVertex<I, V, E, Boolean> toCallSendAndReceive, Block block) {
70 this(toCallSendAndReceive, toCallSendAndReceive, block);
71 }
72
73
74
75
76
77 public static
78 <I extends WritableComparable, V extends Writable, E extends Writable>
79 Block createReceiveFiltering(
80 SupplierFromVertex<I, V, E, Boolean> toCallReceive,
81 Block innerBlock) {
82 return new FilteringBlock<>(null, toCallReceive, innerBlock);
83 }
84
85
86
87
88
89 public static
90 <I extends WritableComparable, V extends Writable, E extends Writable>
91 Block createSendFiltering(
92 SupplierFromVertex<I, V, E, Boolean> toCallSend,
93 Block innerBlock) {
94 return new FilteringBlock<>(toCallSend, null, innerBlock);
95 }
96
97 @Override
98 public Iterator<AbstractPiece> iterator() {
99 return Iterators.transform(
100 block.iterator(),
101 new Function<AbstractPiece, AbstractPiece>() {
102 @Override
103 public AbstractPiece apply(AbstractPiece input) {
104 return new FilteringPiece<>(toCallSend, toCallReceive, input);
105 }
106 });
107 }
108
109 @Override
110 public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
111 block.forAllPossiblePieces(consumer);
112 }
113
114 @Override
115 public PieceCount getPieceCount() {
116 return block.getPieceCount();
117 }
118 }