1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import org.apache.giraph.factories.MessageValueFactory;
22 import org.apache.hadoop.io.Writable;
23 import org.apache.hadoop.io.WritableComparable;
24
25 import java.io.IOException;
26
27
28
29
30
31
32
33 @SuppressWarnings("unchecked")
34 public class ByteArrayOneMessageToManyIds<I extends WritableComparable,
35 M extends Writable> extends ByteArrayVertexIdData<I, M>
36 implements VertexIdMessages<I, M> {
37
38 private MessageValueFactory<M> messageValueFactory;
39
40
41
42
43
44
45 public ByteArrayOneMessageToManyIds(
46 MessageValueFactory<M> messageValueFactory) {
47 this.messageValueFactory = messageValueFactory;
48 }
49
50 @Override
51 public M createData() {
52 return messageValueFactory.newInstance();
53 }
54
55 @Override
56 public void writeData(ExtendedDataOutput out, M message) throws IOException {
57 message.write(out);
58 }
59
60 @Override
61 public void readData(ExtendedDataInput in, M message) throws IOException {
62 message.readFields(in);
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76 public void add(byte[] ids, int idPos, int count, M msg) {
77 try {
78 msg.write(extendedDataOutput);
79 extendedDataOutput.writeInt(count);
80 extendedDataOutput.write(ids, 0, idPos);
81 } catch (IOException e) {
82 throw new IllegalStateException("add: IOException", e);
83 }
84 }
85
86 @Override
87 public void add(I vertexId, M data) {
88 throw new UnsupportedOperationException();
89 }
90
91 @Override
92 public void add(byte[] serializedId, int idPos, M data) {
93 throw new UnsupportedOperationException();
94 }
95
96 @Override
97 public VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator() {
98 return null;
99 }
100
101 @Override
102 public VertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
103 return new OneMessageToManyIdsIterator<>(this);
104 }
105 }