1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.internal;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.io.MappingReader;
25 import org.apache.giraph.job.HadoopUtils;
26 import org.apache.giraph.mapping.MappingEntry;
27 import org.apache.giraph.worker.WorkerGlobalCommUsage;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class WrappedMappingReader<I extends WritableComparable,
46 V extends Writable, E extends Writable, B extends Writable>
47 extends MappingReader<I, V, E, B> {
48
49 private final MappingReader<I, V, E, B> baseMappingReader;
50
51
52
53
54
55
56
57 public WrappedMappingReader(MappingReader<I, V, E, B> baseMappingReader,
58 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
59 this.baseMappingReader = baseMappingReader;
60 super.setConf(conf);
61 baseMappingReader.setConf(conf);
62 }
63
64 @Override
65 public void setConf(
66 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
67
68 }
69
70 @Override
71 public void initialize(InputSplit inputSplit,
72 TaskAttemptContext context) throws IOException, InterruptedException {
73 baseMappingReader.initialize(inputSplit,
74 HadoopUtils.makeTaskAttemptContext(getConf(), context));
75 }
76
77 @Override
78 public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
79 super.setWorkerGlobalCommUsage(usage);
80
81 baseMappingReader.setWorkerGlobalCommUsage(usage);
82 }
83
84 @Override
85 public boolean nextEntry() throws IOException, InterruptedException {
86 return baseMappingReader.nextEntry();
87 }
88
89 @Override
90 public MappingEntry<I, B> getCurrentEntry()
91 throws IOException, InterruptedException {
92 return baseMappingReader.getCurrentEntry();
93 }
94
95
96 @Override
97 public void close() throws IOException {
98 baseMappingReader.close();
99 }
100
101 @Override
102 public float getProgress() throws IOException, InterruptedException {
103 return baseMappingReader.getProgress();
104 }
105 }