Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
TextVertexOutputFormat |
|
| 1.0;1 | ||||
TextVertexOutputFormat$1 |
|
| 1.0;1 | ||||
TextVertexOutputFormat$TextVertexWriter |
|
| 1.0;1 | ||||
TextVertexOutputFormat$TextVertexWriterToEachLine |
|
| 1.0;1 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.io.formats; | |
20 | ||
21 | import java.io.IOException; | |
22 | ||
23 | import org.apache.giraph.graph.Vertex; | |
24 | import org.apache.giraph.io.VertexOutputFormat; | |
25 | import org.apache.giraph.io.VertexWriter; | |
26 | import org.apache.hadoop.io.Text; | |
27 | import org.apache.hadoop.io.Writable; | |
28 | import org.apache.hadoop.io.WritableComparable; | |
29 | import org.apache.hadoop.mapreduce.JobContext; | |
30 | import org.apache.hadoop.mapreduce.OutputCommitter; | |
31 | import org.apache.hadoop.mapreduce.RecordWriter; | |
32 | import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
33 | ||
34 | import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR; | |
35 | ||
36 | /** | |
37 | * Abstract class that users should subclass to use their own text based | |
38 | * vertex output format. | |
39 | * | |
40 | * @param <I> Vertex index value | |
41 | * @param <V> Vertex value | |
42 | * @param <E> Edge value | |
43 | */ | |
44 | 0 | @SuppressWarnings("rawtypes") |
45 | 0 | public abstract class TextVertexOutputFormat<I extends WritableComparable, |
46 | V extends Writable, E extends Writable> | |
47 | extends VertexOutputFormat<I, V, E> { | |
48 | /** Uses the TextOutputFormat to do everything */ | |
49 | 0 | protected GiraphTextOutputFormat textOutputFormat = |
50 | 0 | new GiraphTextOutputFormat() { |
51 | @Override | |
52 | protected String getSubdir() { | |
53 | 0 | return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf()); |
54 | } | |
55 | }; | |
56 | ||
57 | @Override | |
58 | public void checkOutputSpecs(JobContext context) | |
59 | throws IOException, InterruptedException { | |
60 | 0 | textOutputFormat.checkOutputSpecs(context); |
61 | 0 | } |
62 | ||
63 | @Override | |
64 | public OutputCommitter getOutputCommitter(TaskAttemptContext context) | |
65 | throws IOException, InterruptedException { | |
66 | 0 | return textOutputFormat.getOutputCommitter(context); |
67 | } | |
68 | ||
69 | /** | |
70 | * The factory method which produces the {@link TextVertexWriter} used by this | |
71 | * output format. | |
72 | * | |
73 | * @param context | |
74 | * the information about the task | |
75 | * @return | |
76 | * the text vertex writer to be used | |
77 | */ | |
78 | @Override | |
79 | public abstract TextVertexWriter createVertexWriter(TaskAttemptContext | |
80 | context) throws IOException, InterruptedException; | |
81 | ||
82 | /** | |
83 | * Abstract class to be implemented by the user based on their specific | |
84 | * vertex output. Easiest to ignore the key value separator and only use | |
85 | * key instead. | |
86 | */ | |
87 | 0 | protected abstract class TextVertexWriter |
88 | extends VertexWriter<I, V, E> { | |
89 | /** Internal line record writer */ | |
90 | private RecordWriter<Text, Text> lineRecordWriter; | |
91 | /** Context passed to initialize */ | |
92 | private TaskAttemptContext context; | |
93 | ||
94 | @Override | |
95 | public void initialize(TaskAttemptContext context) throws IOException, | |
96 | InterruptedException { | |
97 | 0 | lineRecordWriter = createLineRecordWriter(context); |
98 | 0 | this.context = context; |
99 | 0 | } |
100 | ||
101 | /** | |
102 | * Create the line record writer. Override this to use a different | |
103 | * underlying record writer (useful for testing). | |
104 | * | |
105 | * @param context | |
106 | * the context passed to initialize | |
107 | * @return | |
108 | * the record writer to be used | |
109 | * @throws IOException | |
110 | * exception that can be thrown during creation | |
111 | * @throws InterruptedException | |
112 | * exception that can be thrown during creation | |
113 | */ | |
114 | protected RecordWriter<Text, Text> createLineRecordWriter( | |
115 | TaskAttemptContext context) throws IOException, InterruptedException { | |
116 | 0 | return textOutputFormat.getRecordWriter(context); |
117 | } | |
118 | ||
119 | @Override | |
120 | public void close(TaskAttemptContext context) throws IOException, | |
121 | InterruptedException { | |
122 | 0 | lineRecordWriter.close(context); |
123 | 0 | } |
124 | ||
125 | /** | |
126 | * Get the line record writer. | |
127 | * | |
128 | * @return Record writer to be used for writing. | |
129 | */ | |
130 | public RecordWriter<Text, Text> getRecordWriter() { | |
131 | 0 | return lineRecordWriter; |
132 | } | |
133 | ||
134 | /** | |
135 | * Get the context. | |
136 | * | |
137 | * @return Context passed to initialize. | |
138 | */ | |
139 | public TaskAttemptContext getContext() { | |
140 | 0 | return context; |
141 | } | |
142 | } | |
143 | ||
144 | /** | |
145 | * Abstract class to be implemented by the user to write a line for each | |
146 | * vertex. | |
147 | */ | |
148 | 0 | protected abstract class TextVertexWriterToEachLine extends TextVertexWriter { |
149 | ||
150 | @SuppressWarnings("unchecked") | |
151 | @Override | |
152 | public final void writeVertex(Vertex vertex) throws | |
153 | IOException, InterruptedException { | |
154 | // Note we are writing line as key with null value | |
155 | 0 | getRecordWriter().write(convertVertexToLine(vertex), null); |
156 | 0 | } |
157 | ||
158 | /** | |
159 | * Writes a line for the given vertex. | |
160 | * | |
161 | * @param vertex | |
162 | * the current vertex for writing | |
163 | * @return the text line to be written | |
164 | * @throws IOException | |
165 | * exception that can be thrown while writing | |
166 | */ | |
167 | protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex) | |
168 | throws IOException; | |
169 | } | |
170 | } |