1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats.multi;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.conf.StrConfOption;
23 import org.apache.giraph.io.EdgeInputFormat;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.json.JSONArray;
28 import org.json.JSONException;
29
30 import com.google.common.collect.Lists;
31
32 import java.util.List;
33
34
35
36
37
38
39
40
41
42
43 public class EdgeInputFormatDescription<I extends WritableComparable,
44 E extends Writable> extends InputFormatDescription<EdgeInputFormat<I, E>> {
45
46
47
48
49
50
51
52 public static final StrConfOption EDGE_INPUT_FORMAT_DESCRIPTIONS =
53 new StrConfOption("giraph.multiEdgeInput.descriptions", null,
54 "EdgeInputFormats description - JSON array containing a JSON array " +
55 "for each edge input. Edge input JSON arrays contain one or two " +
56 "elements - first one is the name of edge input class, and second " +
57 "one is JSON object with all specific parameters for this edge " +
58 "input. For example: [[\"EIF1\",{\"p\":\"v1\"}]," +
59 "[\"EIF2\",{\"p\":\"v2\",\"q\":\"v\"}]]");
60
61
62
63
64
65
66 public EdgeInputFormatDescription(
67 Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass) {
68 super(edgeInputFormatClass);
69 }
70
71
72
73
74
75
76 public EdgeInputFormatDescription(String description) {
77 super(description);
78 }
79
80
81
82
83
84
85
86
87 private ImmutableClassesGiraphConfiguration<I, Writable, E>
88 createConfigurationCopy(
89 ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
90 ImmutableClassesGiraphConfiguration<I, Writable, E> confCopy =
91 new ImmutableClassesGiraphConfiguration<I, Writable, E>(conf);
92 confCopy.setEdgeInputFormatClass(getInputFormatClass());
93 putParametersToConfiguration(confCopy);
94 return confCopy;
95 }
96
97
98
99
100
101
102
103
104
105 public static <I extends WritableComparable, E extends Writable>
106 List<EdgeInputFormatDescription<I, E>> getEdgeInputFormatDescriptions(
107 Configuration conf) {
108 String edgeInputFormatDescriptions =
109 EDGE_INPUT_FORMAT_DESCRIPTIONS.get(conf);
110 if (edgeInputFormatDescriptions == null) {
111 return Lists.newArrayList();
112 }
113 try {
114 JSONArray inputFormatsJson = new JSONArray(edgeInputFormatDescriptions);
115 List<EdgeInputFormatDescription<I, E>> descriptions =
116 Lists.newArrayListWithCapacity(inputFormatsJson.length());
117 for (int i = 0; i < inputFormatsJson.length(); i++) {
118 descriptions.add(new EdgeInputFormatDescription<I, E>(
119 inputFormatsJson.getJSONArray(i).toString()));
120 }
121 return descriptions;
122 } catch (JSONException e) {
123 throw new IllegalStateException("getEdgeInputFormatDescriptions: " +
124 "JSONException occurred while trying to process " +
125 edgeInputFormatDescriptions, e);
126 }
127 }
128
129
130
131
132
133
134
135
136
137 public static <I extends WritableComparable,
138 E extends Writable> List<EdgeInputFormat<I, E>> createEdgeInputFormats(
139 ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
140 List<EdgeInputFormatDescription<I, E>> descriptions =
141 getEdgeInputFormatDescriptions(conf);
142 List<EdgeInputFormat<I, E>> edgeInputFormats =
143 Lists.newArrayListWithCapacity(descriptions.size());
144 for (EdgeInputFormatDescription<I, E> description : descriptions) {
145 ImmutableClassesGiraphConfiguration<I, Writable, E> confCopy =
146 description.createConfigurationCopy(conf);
147 edgeInputFormats.add(confCopy.createWrappedEdgeInputFormat());
148 }
149 return edgeInputFormats;
150 }
151 }