Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
TextEdgeOutputFormat |
|
| 1.0;1 | ||||
TextEdgeOutputFormat$1 |
|
| 1.0;1 | ||||
TextEdgeOutputFormat$TextEdgeWriter |
|
| 1.0;1 | ||||
TextEdgeOutputFormat$TextEdgeWriterToEachLine |
|
| 1.0;1 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.io.formats; | |
20 | ||
21 | import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR; | |
22 | ||
23 | import java.io.IOException; | |
24 | ||
25 | import org.apache.giraph.edge.Edge; | |
26 | import org.apache.giraph.io.EdgeOutputFormat; | |
27 | import org.apache.giraph.io.EdgeWriter; | |
28 | import org.apache.hadoop.io.Text; | |
29 | import org.apache.hadoop.io.Writable; | |
30 | import org.apache.hadoop.io.WritableComparable; | |
31 | import org.apache.hadoop.mapreduce.JobContext; | |
32 | import org.apache.hadoop.mapreduce.OutputCommitter; | |
33 | import org.apache.hadoop.mapreduce.RecordWriter; | |
34 | import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
35 | ||
36 | /** | |
37 | * Abstract class that users should subclass to use their own text based | |
38 | * edge output format. | |
39 | * | |
40 | * @param <I> Vertex index value | |
41 | * @param <V> Vertex value | |
42 | * @param <E> Edge value | |
43 | */ | |
44 | 0 | @SuppressWarnings("rawtypes") |
45 | 0 | public abstract class TextEdgeOutputFormat<I extends WritableComparable, |
46 | V extends Writable, E extends Writable> | |
47 | extends EdgeOutputFormat<I, V, E> { | |
48 | /** Uses the TextOutputFormat to do everything */ | |
49 | 0 | protected GiraphTextOutputFormat textOutputFormat = |
50 | 0 | new GiraphTextOutputFormat() { |
51 | @Override | |
52 | protected String getSubdir() { | |
53 | 0 | return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf()); |
54 | } | |
55 | }; | |
56 | ||
57 | @Override | |
58 | public void checkOutputSpecs(JobContext context) | |
59 | throws IOException, InterruptedException { | |
60 | 0 | textOutputFormat.checkOutputSpecs(context); |
61 | 0 | } |
62 | ||
63 | @Override | |
64 | public OutputCommitter getOutputCommitter(TaskAttemptContext context) | |
65 | throws IOException, InterruptedException { | |
66 | 0 | return textOutputFormat.getOutputCommitter(context); |
67 | } | |
68 | ||
69 | /** | |
70 | * The factory method which produces the {@link TextEdgeWriter} used by this | |
71 | * output format. | |
72 | * | |
73 | * @param context the information about the task | |
74 | * @return the text edge writer to be used | |
75 | */ | |
76 | @Override | |
77 | public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext | |
78 | context) throws IOException, InterruptedException; | |
79 | ||
80 | /** | |
81 | * Abstract class to be implemented by the user based on their specific | |
82 | * edge output. Easiest to ignore the key value separator and only use | |
83 | * key instead. | |
84 | */ | |
85 | 0 | protected abstract class TextEdgeWriter<I extends WritableComparable, |
86 | V extends Writable, E extends Writable> | |
87 | extends EdgeWriter<I, V, E> { | |
88 | /** Internal line record writer */ | |
89 | private RecordWriter<Text, Text> lineRecordWriter; | |
90 | /** Context passed to initialize */ | |
91 | private TaskAttemptContext context; | |
92 | ||
93 | @Override | |
94 | public void initialize(TaskAttemptContext context) throws IOException, | |
95 | InterruptedException { | |
96 | 0 | lineRecordWriter = createLineRecordWriter(context); |
97 | 0 | this.context = context; |
98 | 0 | } |
99 | ||
100 | /** | |
101 | * Create the line record writer. Override this to use a different | |
102 | * underlying record writer (useful for testing). | |
103 | * | |
104 | * @param context the context passed to initialize | |
105 | * @return the record writer to be used | |
106 | * @throws IOException exception that can be thrown during creation | |
107 | * @throws InterruptedException exception that can be thrown during creation | |
108 | */ | |
109 | protected RecordWriter<Text, Text> createLineRecordWriter( | |
110 | TaskAttemptContext context) throws IOException, InterruptedException { | |
111 | 0 | return textOutputFormat.getRecordWriter(context); |
112 | } | |
113 | ||
114 | @Override | |
115 | public void close(TaskAttemptContext context) throws IOException, | |
116 | InterruptedException { | |
117 | 0 | lineRecordWriter.close(context); |
118 | 0 | } |
119 | ||
120 | /** | |
121 | * Get the line record writer. | |
122 | * | |
123 | * @return Record writer to be used for writing. | |
124 | */ | |
125 | public RecordWriter<Text, Text> getRecordWriter() { | |
126 | 0 | return lineRecordWriter; |
127 | } | |
128 | ||
129 | /** | |
130 | * Get the context. | |
131 | * | |
132 | * @return Context passed to initialize. | |
133 | */ | |
134 | public TaskAttemptContext getContext() { | |
135 | 0 | return context; |
136 | } | |
137 | } | |
138 | ||
139 | /** | |
140 | * Abstract class to be implemented by the user to write a line for each | |
141 | * edge. | |
142 | */ | |
143 | 0 | protected abstract class TextEdgeWriterToEachLine< |
144 | I extends WritableComparable, V extends Writable, E extends Writable> | |
145 | extends TextEdgeWriter<I, V, E> { | |
146 | ||
147 | @Override | |
148 | public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge) | |
149 | throws IOException, InterruptedException { | |
150 | ||
151 | // Note we are writing line as key with null value | |
152 | 0 | getRecordWriter().write( |
153 | 0 | convertEdgeToLine(sourceId, sourceValue, edge), null); |
154 | 0 | } |
155 | ||
156 | /** | |
157 | * Writes a line for the given edge. | |
158 | * | |
159 | * @param sourceId the current id of the source vertex | |
160 | * @param sourceValue the current value of the source vertex | |
161 | * @param edge the current vertex for writing | |
162 | * @return the text line to be written | |
163 | * @throws IOException exception that can be thrown while writing | |
164 | */ | |
165 | protected abstract Text convertEdgeToLine(I sourceId, | |
166 | V sourceValue, Edge<I, E> edge) throws IOException; | |
167 | } | |
168 | } |