1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.library;
19
20 import static com.google.common.base.Preconditions.checkNotNull;
21
22 import java.util.Iterator;
23
24 import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
25 import org.apache.giraph.edge.Edge;
26 import org.apache.giraph.function.PairPredicate;
27 import org.apache.giraph.function.Predicate;
28 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
29 import org.apache.giraph.function.vertex.SupplierFromVertex;
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.io.WritableComparable;
33
34 import com.google.common.collect.AbstractIterator;
35 import com.google.common.collect.Iterators;
36 import com.google.common.collect.UnmodifiableIterator;
37
38
39
40
41
42 @SuppressWarnings("rawtypes")
43 public class VertexSuppliers {
44
45 private VertexSuppliers() { }
46
47
48
49
50
51 public static
52 <I extends WritableComparable, V extends Writable, E extends Writable>
53 SupplierFromVertex<I, V, E, I> vertexIdSupplier() {
54 return new SupplierFromVertex<I, V, E, I>() {
55 @Override
56 public I get(Vertex<I, V, E> vertex) {
57 return vertex.getId();
58 }
59 };
60 }
61
62
63
64
65
66 public static
67 <I extends WritableComparable, V extends Writable, E extends Writable>
68 SupplierFromVertex<I, V, E, V> vertexValueSupplier() {
69 return new SupplierFromVertex<I, V, E, V>() {
70 @Override
71 public V get(Vertex<I, V, E> vertex) {
72 return vertex.getValue();
73 }
74 };
75 }
76
77
78
79
80
81 public static
82 <I extends WritableComparable, V extends Writable, E extends Writable>
83 SupplierFromVertex<I, V, E, Iterable<Edge<I, E>>> vertexEdgesSupplier() {
84 return new SupplierFromVertex<I, V, E, Iterable<Edge<I, E>>>() {
85 @Override
86 public Iterable<Edge<I, E>> get(Vertex<I, V, E> vertex) {
87 return vertex.getEdges();
88 }
89 };
90 }
91
92
93
94
95
96
97 public static
98 <I extends WritableComparable, V extends Writable, E extends Writable>
99 SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplier() {
100 return new SupplierFromVertex<I, V, E, Iterator<I>>() {
101 @Override
102 public Iterator<I> get(final Vertex<I, V, E> vertex) {
103 return new TargetVertexIdIterator<>(vertex);
104 }
105 };
106 }
107
108
109
110
111
112
113
114 public static
115 <I extends WritableComparable, V extends Writable, E extends Writable>
116 SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplier(
117 final Predicate<I> toSupply) {
118 return new SupplierFromVertex<I, V, E, Iterator<I>>() {
119 @Override
120 public Iterator<I> get(final Vertex<I, V, E> vertex) {
121 return Iterators.filter(
122 new TargetVertexIdIterator<>(vertex),
123 new com.google.common.base.Predicate<I>() {
124 @Override
125 public boolean apply(I input) {
126 return toSupply.apply(input);
127 }
128 });
129 }
130 };
131 }
132
133
134
135
136
137
138
139 public static
140 <I extends WritableComparable, V extends Writable, E extends Writable>
141 SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplierWithIndex(
142 final PairPredicate<IntRef, I> toSupply) {
143 return new SupplierFromVertex<I, V, E, Iterator<I>>() {
144 @Override
145 public Iterator<I> get(final Vertex<I, V, E> vertex) {
146
147 return filterWithIndex(
148 new TargetVertexIdIterator<>(vertex), toSupply);
149 }
150 };
151 }
152
153
154
155
156
157 private static <T> UnmodifiableIterator<T> filterWithIndex(
158 final Iterator<T> unfiltered, final PairPredicate<IntRef, T> predicate) {
159 checkNotNull(unfiltered);
160 checkNotNull(predicate);
161 return new AbstractIterator<T>() {
162 private final IntRef index = new IntRef(0);
163 @Override protected T computeNext() {
164 while (unfiltered.hasNext()) {
165 T element = unfiltered.next();
166 boolean res = predicate.apply(index, element);
167 index.value++;
168 if (res) {
169 return element;
170 }
171 }
172 return endOfData();
173 }
174 };
175 }
176 }