1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats.multi;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.conf.StrConfOption;
23 import org.apache.giraph.io.VertexInputFormat;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.json.JSONArray;
28 import org.json.JSONException;
29
30 import com.google.common.collect.Lists;
31
32 import java.util.List;
33
34
35
36
37
38
39
40
41
42
43
44 public class VertexInputFormatDescription<I extends WritableComparable,
45 V extends Writable, E extends Writable>
46 extends InputFormatDescription<VertexInputFormat<I, V, E>> {
47
48
49
50
51
52
53
54 public static final StrConfOption VERTEX_INPUT_FORMAT_DESCRIPTIONS =
55 new StrConfOption("giraph.multiVertexInput.descriptions", null,
56 "VertexInputFormats description - JSON array containing a JSON " +
57 "array for each vertex input. Vertex input JSON arrays contain " +
58 "one or two elements - first one is the name of vertex input " +
59 "class, and second one is JSON object with all specific parameters " +
60 "for this vertex input. For example: [[\"VIF1\",{\"p\":\"v1\"}]," +
61 "[\"VIF2\",{\"p\":\"v2\",\"q\":\"v\"}]]\"");
62
63
64
65
66
67
68 public VertexInputFormatDescription(
69 Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass) {
70 super(vertexInputFormatClass);
71 }
72
73
74
75
76
77
78 public VertexInputFormatDescription(String description) {
79 super(description);
80 }
81
82
83
84
85
86
87
88
89 private ImmutableClassesGiraphConfiguration<I, V, E>
90 createConfigurationCopy(
91 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
92 ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
93 new ImmutableClassesGiraphConfiguration<I, V, E>(conf);
94 confCopy.setVertexInputFormatClass(getInputFormatClass());
95 putParametersToConfiguration(confCopy);
96 return confCopy;
97 }
98
99
100
101
102
103
104
105
106
107
108 public static <I extends WritableComparable, V extends Writable,
109 E extends Writable>
110 List<VertexInputFormatDescription<I, V, E>> getVertexInputFormatDescriptions(
111 Configuration conf) {
112 String vertexInputFormatDescriptions =
113 VERTEX_INPUT_FORMAT_DESCRIPTIONS.get(conf);
114 if (vertexInputFormatDescriptions == null) {
115 return Lists.newArrayList();
116 }
117 try {
118 JSONArray inputFormatsJson = new JSONArray(vertexInputFormatDescriptions);
119 List<VertexInputFormatDescription<I, V, E>> descriptions =
120 Lists.newArrayListWithCapacity(inputFormatsJson.length());
121 for (int i = 0; i < inputFormatsJson.length(); i++) {
122 descriptions.add(new VertexInputFormatDescription<I, V, E>(
123 inputFormatsJson.getJSONArray(i).toString()));
124 }
125 return descriptions;
126 } catch (JSONException e) {
127 throw new IllegalStateException("getVertexInputFormatDescriptions: " +
128 "JSONException occurred while trying to process " +
129 vertexInputFormatDescriptions, e);
130 }
131 }
132
133
134
135
136
137
138
139
140
141
142 public static <I extends WritableComparable, V extends Writable,
143 E extends Writable>
144 List<VertexInputFormat<I, V, E>> createVertexInputFormats(
145 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
146 List<VertexInputFormatDescription<I, V, E>> descriptions =
147 getVertexInputFormatDescriptions(conf);
148 List<VertexInputFormat<I, V, E>> vertexInputFormats =
149 Lists.newArrayListWithCapacity(descriptions.size());
150 for (VertexInputFormatDescription<I, V, E> description : descriptions) {
151 ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
152 description.createConfigurationCopy(conf);
153 vertexInputFormats.add(confCopy.createWrappedVertexInputFormat());
154 }
155 return vertexInputFormats;
156 }
157 }