1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.piece.delegate;
19
20 import java.util.ArrayList;
21
22 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
23 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
24 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
25 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
26 import org.apache.giraph.function.vertex.SupplierFromVertex;
27 import org.apache.giraph.graph.Vertex;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30
31 import com.google.common.base.Preconditions;
32
33
34
35
36
37
38
39
40
41
42
43
44
45 @SuppressWarnings({ "rawtypes" })
46 public class FilteringPiece<I extends WritableComparable, V extends Writable,
47 E extends Writable, M extends Writable, WV, WM extends Writable, S>
48 extends DelegatePiece<I, V, E, M, WV, WM, S> {
49 private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
50 private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
51
52
53
54
55
56
57 @SuppressWarnings("unchecked")
58 public FilteringPiece(
59 SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
60 SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
61 toCallReceive,
62 AbstractPiece<? super I, ? super V, ? super E, ? super M,
63 ? super WV, ? super WM, ? super S> innerPiece) {
64 super(innerPiece);
65
66
67
68 this.toCallSend = (SupplierFromVertex) toCallSend;
69 this.toCallReceive = (SupplierFromVertex) toCallReceive;
70 Preconditions.checkArgument(
71 toCallSend != null || toCallReceive != null,
72 "Both send and receive filter cannot be null");
73 }
74
75
76
77
78
79 public FilteringPiece(
80 SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
81 toCallSendAndReceive,
82 AbstractPiece<? super I, ? super V, ? super E, ? super M,
83 ? super WV, ? super WM, ? super S> innerPiece) {
84 this(toCallSendAndReceive, toCallSendAndReceive, innerPiece);
85 }
86
87
88
89
90
91 public static <I extends WritableComparable, V extends Writable,
92 E extends Writable, M extends Writable, WV, WM extends Writable, S>
93 FilteringPiece<I, V, E, M, WV, WM, S> createReceiveFiltering(
94 SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
95 toCallReceive,
96 AbstractPiece<? super I, ? super V, ? super E, ? super M,
97 ? super WV, ? super WM, ? super S> innerPiece) {
98 return new FilteringPiece<>(null, toCallReceive, innerPiece);
99 }
100
101
102
103
104
105 public static <I extends WritableComparable, V extends Writable,
106 E extends Writable, M extends Writable, WV, WM extends Writable, S>
107 FilteringPiece<I, V, E, M, WV, WM, S> createSendFiltering(
108 SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
109 AbstractPiece<? super I, ? super V, ? super E, ? super M, ? super WV,
110 ? super WM, ? super S> innerPiece) {
111 return new FilteringPiece<>(toCallSend, null, innerPiece);
112 }
113
114 @Override
115 protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
116 ArrayList<InnerVertexSender> workerSendFunctions,
117 BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
118 return new DelegateWorkerSendFunctions(workerSendFunctions) {
119 @Override
120 public void vertexSend(Vertex<I, V, E> vertex) {
121 if (toCallSend == null || toCallSend.get(vertex)) {
122 super.vertexSend(vertex);
123 }
124 }
125 };
126 }
127
128 @Override
129 protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
130 ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
131 BlockWorkerReceiveApi<I> workerApi, S executionStage) {
132 return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) {
133 @Override
134 public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
135 if (toCallReceive == null || toCallReceive.get(vertex)) {
136 super.vertexReceive(vertex, messages);
137 }
138 }
139 };
140 }
141
142 @Override
143 protected String delegationName() {
144 if (toCallSend != null && toCallReceive != null) {
145 if (toCallSend != toCallReceive) {
146 return "AsymFilter";
147 }
148 return "Filter";
149 } else if (toCallSend != null) {
150 return "SendFilter";
151 } else if (toCallReceive != null) {
152 return "ReceiveFilter";
153 } else {
154 throw new IllegalStateException("Both Send and Receive filters are null");
155 }
156 }
157 }