1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import java.io.BufferedReader;
22 import java.io.IOException;
23 import java.io.InputStreamReader;
24 import java.nio.charset.Charset;
25 import java.util.Set;
26
27 import org.apache.giraph.worker.WorkerContext;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.filecache.DistributedCache;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.log4j.Logger;
33
34 import com.google.common.collect.ImmutableSet;
35
36
37
38
39 public class RandomWalkWorkerContext extends WorkerContext {
40
41 private static final int DEFAULT_MAX_SUPERSTEPS = 30;
42
43 private static final float DEFAULT_TELEPORTATION_PROBABILITY = 0.15f;
44
45 private static int MAX_SUPERSTEPS;
46
47 private static double TELEPORTATION_PROBABILITY;
48
49 private static Set<Long> SOURCES;
50
51
52 private static final String SOURCE_VERTEX =
53 RandomWalkWithRestartComputation.class.getName() + ".sourceVertex";
54
55
56 private static final Logger LOG = Logger
57 .getLogger(RandomWalkWorkerContext.class);
58
59
60
61
62 public int getMaxSupersteps() {
63 if (MAX_SUPERSTEPS == 0) {
64 throw new IllegalStateException(
65 RandomWalkWorkerContext.class.getSimpleName() +
66 " was not initialized. Relaunch your job " +
67 "by setting the appropriate WorkerContext");
68 }
69 return MAX_SUPERSTEPS;
70 }
71
72
73
74
75 public double getTeleportationProbability() {
76 if (TELEPORTATION_PROBABILITY == 0) {
77 throw new IllegalStateException(
78 RandomWalkWorkerContext.class.getSimpleName() +
79 " was not initialized. Relaunch your job " +
80 "by setting the appropriate WorkerContext");
81 }
82 return TELEPORTATION_PROBABILITY;
83 }
84
85
86
87
88
89
90 public boolean isSource(long id) {
91 return SOURCES.contains(id);
92 }
93
94
95
96
97 public int numSources() {
98 return SOURCES.size();
99 }
100
101
102
103
104
105
106
107
108
109
110 private ImmutableSet<Long> initializeSources(Configuration configuration) {
111 ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
112 long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
113 if (sourceVertex != Long.MIN_VALUE) {
114 return ImmutableSet.of(sourceVertex);
115 } else {
116 Path sourceFile = null;
117 try {
118
119 Path[] cacheFiles = DistributedCache.getLocalCacheFiles(configuration);
120 if (cacheFiles == null || cacheFiles.length == 0) {
121
122 return ImmutableSet.of();
123 }
124
125 sourceFile = cacheFiles[0];
126 FileSystem fs = FileSystem.getLocal(configuration);
127 BufferedReader in = new BufferedReader(new InputStreamReader(
128 fs.open(sourceFile), Charset.defaultCharset()));
129 String line;
130 while ((line = in.readLine()) != null) {
131 builder.add(Long.parseLong(line));
132 }
133 in.close();
134 } catch (IOException e) {
135 getContext().setStatus(
136 "Could not load local cache files: " + sourceFile);
137 LOG.error("Could not load local cache files: " + sourceFile, e);
138 }
139 }
140 return builder.build();
141 }
142
143 @Override
144 public void preApplication() throws InstantiationException,
145 IllegalAccessException {
146 setStaticVars(getContext().getConfiguration());
147 }
148
149
150
151
152
153
154 private void setStaticVars(Configuration configuration) {
155 MAX_SUPERSTEPS = configuration.getInt(RandomWalkComputation.MAX_SUPERSTEPS,
156 DEFAULT_MAX_SUPERSTEPS);
157 TELEPORTATION_PROBABILITY = configuration.getFloat(
158 RandomWalkComputation.TELEPORTATION_PROBABILITY,
159 DEFAULT_TELEPORTATION_PROBABILITY);
160 SOURCES = initializeSources(configuration);
161 }
162
163 @Override
164 public void preSuperstep() {
165 }
166
167 @Override
168 public void postSuperstep() {
169 }
170
171 @Override
172 public void postApplication() {
173 }
174 }