1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.conf; |
19 | |
|
20 | |
import java.io.DataInput; |
21 | |
import java.io.DataOutput; |
22 | |
import java.io.IOException; |
23 | |
|
24 | |
import org.apache.giraph.combiner.MessageCombiner; |
25 | |
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; |
26 | |
import org.apache.giraph.factories.DefaultMessageValueFactory; |
27 | |
import org.apache.giraph.factories.MessageValueFactory; |
28 | |
import org.apache.giraph.utils.ReflectionUtils; |
29 | |
import org.apache.giraph.utils.WritableUtils; |
30 | |
import org.apache.hadoop.io.Writable; |
31 | |
import org.apache.hadoop.io.WritableComparable; |
32 | |
import com.google.common.base.Preconditions; |
33 | |
|
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
public class DefaultMessageClasses |
41 | |
<I extends WritableComparable, M extends Writable> |
42 | |
implements MessageClasses<I, M> { |
43 | |
|
44 | |
private Class<M> messageClass; |
45 | |
|
46 | |
private Class<? extends MessageValueFactory<M>> |
47 | |
messageValueFactoryClass; |
48 | |
|
49 | |
private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass; |
50 | |
|
51 | |
private boolean messageClassModified; |
52 | |
|
53 | |
private MessageEncodeAndStoreType messageEncodeAndStoreType; |
54 | |
|
55 | |
|
56 | 0 | public DefaultMessageClasses() { |
57 | 0 | } |
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
public DefaultMessageClasses( |
67 | |
Class<M> messageClass, |
68 | |
Class<? extends MessageValueFactory<M>> messageValueFactoryClass, |
69 | |
Class<? extends MessageCombiner<? super I, M>> messageCombinerClass, |
70 | 0 | MessageEncodeAndStoreType messageEncodeAndStoreType) { |
71 | 0 | this.messageClass = messageClass; |
72 | 0 | this.messageValueFactoryClass = messageValueFactoryClass; |
73 | 0 | this.messageCombinerClass = messageCombinerClass; |
74 | 0 | this.messageClassModified = false; |
75 | 0 | this.messageEncodeAndStoreType = messageEncodeAndStoreType; |
76 | 0 | } |
77 | |
|
78 | |
@Override |
79 | |
public Class<M> getMessageClass() { |
80 | 0 | return messageClass; |
81 | |
} |
82 | |
|
83 | |
@Override |
84 | |
public MessageValueFactory<M> createMessageValueFactory( |
85 | |
ImmutableClassesGiraphConfiguration conf) { |
86 | 0 | if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) { |
87 | 0 | return new DefaultMessageValueFactory<>(messageClass, conf); |
88 | |
} |
89 | |
|
90 | 0 | MessageValueFactory factory = ReflectionUtils.newInstance( |
91 | |
messageValueFactoryClass, conf); |
92 | 0 | if (!factory.newInstance().getClass().equals(messageClass)) { |
93 | 0 | throw new IllegalStateException("Message factory " + |
94 | |
messageValueFactoryClass + " creates " + |
95 | 0 | factory.newInstance().getClass().getName() + ", but message type is " + |
96 | 0 | messageClass.getName()); |
97 | |
} |
98 | 0 | return factory; |
99 | |
} |
100 | |
|
101 | |
@Override |
102 | |
public MessageCombiner<? super I, M> createMessageCombiner( |
103 | |
ImmutableClassesGiraphConfiguration conf) { |
104 | 0 | if (messageCombinerClass == null) { |
105 | 0 | return null; |
106 | |
} |
107 | |
|
108 | 0 | MessageCombiner combiner = |
109 | 0 | ReflectionUtils.newInstance(messageCombinerClass, conf); |
110 | 0 | if (combiner != null) { |
111 | 0 | Preconditions.checkState( |
112 | 0 | combiner.createInitialMessage().getClass().equals(messageClass)); |
113 | |
} |
114 | 0 | return combiner; |
115 | |
} |
116 | |
|
117 | |
@Override |
118 | |
public boolean useMessageCombiner() { |
119 | 0 | return messageCombinerClass != null; |
120 | |
} |
121 | |
|
122 | |
@Override |
123 | |
public boolean ignoreExistingVertices() { |
124 | 0 | return false; |
125 | |
} |
126 | |
|
127 | |
@Override |
128 | |
public MessageEncodeAndStoreType getMessageEncodeAndStoreType() { |
129 | 0 | return messageEncodeAndStoreType; |
130 | |
} |
131 | |
|
132 | |
@Override |
133 | |
public MessageClasses<I, M> createCopyForNewSuperstep() { |
134 | 0 | return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass, |
135 | |
messageCombinerClass, messageEncodeAndStoreType); |
136 | |
} |
137 | |
|
138 | |
@Override |
139 | |
public void verifyConsistent( |
140 | |
ImmutableClassesGiraphConfiguration conf) { |
141 | 0 | Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments( |
142 | |
MessageValueFactory.class, messageValueFactoryClass); |
143 | 0 | ReflectionUtils.verifyTypes(messageClass, factoryTypes[0], |
144 | |
"Message factory", messageValueFactoryClass); |
145 | |
|
146 | 0 | if (messageCombinerClass != null) { |
147 | 0 | Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments( |
148 | |
MessageCombiner.class, messageCombinerClass); |
149 | 0 | ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0], |
150 | |
"Vertex id", messageCombinerClass); |
151 | 0 | ReflectionUtils.verifyTypes(messageClass, combinerTypes[1], |
152 | |
"Outgoing message", messageCombinerClass); |
153 | |
} |
154 | 0 | } |
155 | |
|
156 | |
|
157 | |
|
158 | |
|
159 | |
|
160 | |
public void setMessageClass(Class<M> messageClass) { |
161 | 0 | this.messageClassModified = true; |
162 | 0 | this.messageClass = messageClass; |
163 | 0 | } |
164 | |
|
165 | |
|
166 | |
|
167 | |
|
168 | |
|
169 | |
public void setIfNotModifiedMessageClass(Class<M> messageClass) { |
170 | 0 | if (!messageClassModified) { |
171 | 0 | this.messageClass = messageClass; |
172 | |
} |
173 | 0 | } |
174 | |
|
175 | |
public void setMessageCombinerClass( |
176 | |
Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) { |
177 | 0 | this.messageCombinerClass = messageCombinerClass; |
178 | 0 | } |
179 | |
|
180 | |
public void setMessageValueFactoryClass( |
181 | |
Class<? extends MessageValueFactory<M>> messageValueFactoryClass) { |
182 | 0 | this.messageValueFactoryClass = messageValueFactoryClass; |
183 | 0 | } |
184 | |
|
185 | |
public void setMessageEncodeAndStoreType( |
186 | |
MessageEncodeAndStoreType messageEncodeAndStoreType) { |
187 | 0 | this.messageEncodeAndStoreType = messageEncodeAndStoreType; |
188 | 0 | } |
189 | |
|
190 | |
@Override |
191 | |
public void write(DataOutput out) throws IOException { |
192 | 0 | WritableUtils.writeClass(messageClass, out); |
193 | 0 | WritableUtils.writeClass(messageValueFactoryClass, out); |
194 | 0 | WritableUtils.writeClass(messageCombinerClass, out); |
195 | 0 | out.writeBoolean(messageClassModified); |
196 | 0 | out.writeByte(messageEncodeAndStoreType.ordinal()); |
197 | 0 | } |
198 | |
|
199 | |
@Override |
200 | |
public void readFields(DataInput in) throws IOException { |
201 | 0 | messageClass = WritableUtils.readClass(in); |
202 | 0 | messageValueFactoryClass = WritableUtils.readClass(in); |
203 | 0 | messageCombinerClass = WritableUtils.readClass(in); |
204 | 0 | messageClassModified = in.readBoolean(); |
205 | 0 | messageEncodeAndStoreType = |
206 | 0 | messageEncodeAndStoreType.values()[in.readByte()]; |
207 | 0 | } |
208 | |
} |