1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.block_app.framework.piece.messages; |
19 | |
|
20 | |
import org.apache.giraph.combiner.MessageCombiner; |
21 | |
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; |
22 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
23 | |
import org.apache.giraph.conf.MessageClasses; |
24 | |
import org.apache.giraph.factories.MessageValueFactory; |
25 | |
import org.apache.giraph.utils.ReflectionUtils; |
26 | |
import org.apache.giraph.writable.kryo.KryoWritable; |
27 | |
import org.apache.hadoop.io.Writable; |
28 | |
import org.apache.hadoop.io.WritableComparable; |
29 | |
|
30 | |
import com.google.common.base.Preconditions; |
31 | |
|
32 | |
|
33 | |
|
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
public class ObjectMessageClasses<I extends WritableComparable, |
40 | |
M extends Writable> extends KryoWritable implements MessageClasses<I, M> { |
41 | |
private final Class<M> messageClass; |
42 | |
private final SupplierFromConf<MessageValueFactory<M>> |
43 | |
messageValueFactorySupplier; |
44 | |
private final SupplierFromConf<? extends MessageCombiner<? super I, M>> |
45 | |
messageCombinerSupplier; |
46 | |
private final MessageEncodeAndStoreType messageEncodeAndStoreType; |
47 | |
private final boolean ignoreExistingVertices; |
48 | |
|
49 | |
public ObjectMessageClasses() { |
50 | 0 | this(null, null, null, null, false); |
51 | 0 | } |
52 | |
|
53 | |
public ObjectMessageClasses(Class<M> messageClass, |
54 | |
SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier, |
55 | |
SupplierFromConf<? extends MessageCombiner<? super I, M>> |
56 | |
messageCombinerSupplier, |
57 | |
MessageEncodeAndStoreType messageEncodeAndStoreType, |
58 | 0 | boolean ignoreExistingVertices) { |
59 | 0 | this.messageClass = messageClass; |
60 | 0 | this.messageValueFactorySupplier = messageValueFactorySupplier; |
61 | 0 | this.messageCombinerSupplier = messageCombinerSupplier; |
62 | 0 | this.messageEncodeAndStoreType = messageEncodeAndStoreType; |
63 | 0 | this.ignoreExistingVertices = ignoreExistingVertices; |
64 | 0 | } |
65 | |
|
66 | |
@Override |
67 | |
public Class<M> getMessageClass() { |
68 | 0 | return messageClass; |
69 | |
} |
70 | |
|
71 | |
@Override |
72 | |
public MessageValueFactory<M> createMessageValueFactory( |
73 | |
ImmutableClassesGiraphConfiguration conf) { |
74 | 0 | return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf)); |
75 | |
} |
76 | |
|
77 | |
@Override |
78 | |
public MessageCombiner<? super I, M> createMessageCombiner( |
79 | |
ImmutableClassesGiraphConfiguration<I, ? extends Writable, |
80 | |
? extends Writable> conf) { |
81 | 0 | return messageCombinerSupplier != null ? |
82 | 0 | Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null; |
83 | |
} |
84 | |
|
85 | |
@Override |
86 | |
public boolean useMessageCombiner() { |
87 | 0 | return messageCombinerSupplier != null; |
88 | |
} |
89 | |
|
90 | |
@Override |
91 | |
public boolean ignoreExistingVertices() { |
92 | 0 | return ignoreExistingVertices; |
93 | |
} |
94 | |
|
95 | |
@Override |
96 | |
public MessageEncodeAndStoreType getMessageEncodeAndStoreType() { |
97 | 0 | return messageEncodeAndStoreType; |
98 | |
} |
99 | |
|
100 | |
@Override |
101 | |
public MessageClasses<I, M> createCopyForNewSuperstep() { |
102 | 0 | return new ObjectMessageClasses<>( |
103 | |
messageClass, messageValueFactorySupplier, |
104 | |
messageCombinerSupplier, messageEncodeAndStoreType, |
105 | |
ignoreExistingVertices); |
106 | |
} |
107 | |
|
108 | |
@Override |
109 | |
public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) { |
110 | 0 | MessageValueFactory<M> messageValueFactory = |
111 | 0 | messageValueFactorySupplier.apply(conf); |
112 | 0 | Preconditions.checkState( |
113 | 0 | messageValueFactory.newInstance().getClass().equals(messageClass)); |
114 | |
|
115 | 0 | if (messageCombinerSupplier != null) { |
116 | 0 | MessageCombiner<? super I, M> messageCombiner = |
117 | 0 | messageCombinerSupplier.apply(conf); |
118 | 0 | Preconditions.checkState(messageCombiner.createInitialMessage() |
119 | 0 | .getClass().equals(messageClass)); |
120 | 0 | Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments( |
121 | 0 | MessageCombiner.class, messageCombiner.getClass()); |
122 | 0 | ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0], |
123 | 0 | "Vertex id", messageCombiner.getClass()); |
124 | 0 | ReflectionUtils.verifyTypes(messageClass, combinerTypes[1], |
125 | 0 | "Outgoing message", messageCombiner.getClass()); |
126 | |
} |
127 | 0 | } |
128 | |
} |