1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import it.unimi.dsi.fastutil.longs.LongArrayList;
22 import it.unimi.dsi.fastutil.longs.LongListIterator;
23
24 import java.io.IOException;
25 import java.util.Iterator;
26
27 import org.apache.giraph.factories.MessageValueFactory;
28 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
29 import org.apache.giraph.utils.ExtendedDataOutput;
30 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
31 import org.apache.hadoop.io.Writable;
32
33
34
35
36
37
38 public class PointerListMessagesIterable<M extends Writable>
39 implements Iterable<M> {
40
41 private final MessageValueFactory<M> messageValueFactory;
42
43 private final LongArrayList pointers;
44
45 private final ExtendedByteArrayOutputBuffer msgBuffer;
46
47 private final UnsafeReusableByteArrayInput messageReader;
48
49
50
51
52
53
54
55 public PointerListMessagesIterable(MessageValueFactory<M> messageValueFactory,
56 LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) {
57 this.messageValueFactory = messageValueFactory;
58 this.pointers = pointers;
59 this.msgBuffer = msgBuffer;
60
61 messageReader = new UnsafeReusableByteArrayInput();
62 }
63
64
65
66
67
68
69 protected M createMessage() {
70 return messageValueFactory.newInstance();
71 }
72
73 @Override
74 public Iterator<M> iterator() {
75 return new Iterator<M>() {
76 private final LongListIterator iterator = pointers.iterator();
77 private final M reusableMsg =
78 PointerListMessagesIterable.this.createMessage();
79 @Override
80 public boolean hasNext() {
81 return iterator.hasNext();
82 }
83
84 @Override
85 public M next() {
86 long pointer = iterator.nextLong();
87 try {
88 int index = (int) (pointer >>> 32);
89 int offset = (int) pointer;
90 ExtendedDataOutput buffer = msgBuffer.getDataOutput(index);
91 messageReader.initialize(buffer.getByteArray(), offset,
92 buffer.getPos());
93 reusableMsg.readFields(messageReader);
94 } catch (IOException e) {
95 throw new IllegalStateException("Got exception : " + e);
96 }
97 return reusableMsg;
98 }
99
100 @Override
101 public void remove() {
102 throw new UnsupportedOperationException();
103 }
104 };
105 }
106 }