Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
HBaseVertexInputFormat |
|
| 1.0;1 | ||||
HBaseVertexInputFormat$HBaseVertexReader |
|
| 1.0;1 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.io.hbase; | |
19 | ||
20 | import java.io.IOException; | |
21 | import java.util.List; | |
22 | import org.apache.giraph.io.VertexInputFormat; | |
23 | import org.apache.giraph.io.VertexReader; | |
24 | import org.apache.hadoop.hbase.client.Result; | |
25 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
26 | import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
27 | import org.apache.hadoop.io.Writable; | |
28 | import org.apache.hadoop.io.WritableComparable; | |
29 | import org.apache.hadoop.mapreduce.InputSplit; | |
30 | import org.apache.hadoop.mapreduce.JobContext; | |
31 | import org.apache.hadoop.mapreduce.RecordReader; | |
32 | import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
33 | import org.apache.log4j.Logger; | |
34 | ||
35 | /** | |
36 | * | |
37 | * Base class that wraps an HBase TableInputFormat and underlying Scan object | |
38 | * to help instantiate vertices from an HBase table. All | |
39 | * the static TableInputFormat properties necessary to configure | |
40 | * an HBase job are available. | |
41 | * | |
42 | * For example, setting conf.set(TableInputFormat.INPUT_TABLE, "in_table"); | |
43 | * from the job setup routine will properly delegate to the | |
44 | * TableInputFormat instance. The Configurable interface prevents specific | |
45 | * wrapper methods from having to be called. | |
46 | * | |
47 | * Works with {@link HBaseVertexOutputFormat} | |
48 | * | |
49 | * @param <I> Vertex index value | |
50 | * @param <V> Vertex value | |
51 | * @param <E> Edge value | |
52 | */ | |
53 | @SuppressWarnings("rawtypes") | |
54 | 0 | public abstract class HBaseVertexInputFormat< |
55 | I extends WritableComparable, | |
56 | V extends Writable, | |
57 | E extends Writable> | |
58 | extends VertexInputFormat<I, V, E> { | |
59 | ||
60 | ||
61 | /** | |
62 | * delegate HBase table input format | |
63 | */ | |
64 | 0 | protected static final TableInputFormat BASE_FORMAT = |
65 | new TableInputFormat(); | |
66 | /** | |
67 | * logger | |
68 | */ | |
69 | 0 | private static final Logger LOG = |
70 | 0 | Logger.getLogger(HBaseVertexInputFormat.class); |
71 | ||
72 | /** | |
73 | * Takes an instance of RecordReader that supports | |
74 | * HBase row-key, result records. Subclasses can focus on | |
75 | * vertex instantiation details without worrying about connection | |
76 | * semantics. Subclasses are expected to implement nextVertex() and | |
77 | * getCurrentVertex() | |
78 | * | |
79 | * | |
80 | * | |
81 | * @param <I> Vertex index value | |
82 | * @param <V> Vertex value | |
83 | * @param <E> Edge value | |
84 | */ | |
85 | public abstract static class HBaseVertexReader< | |
86 | I extends WritableComparable, | |
87 | V extends Writable, | |
88 | E extends Writable> | |
89 | extends VertexReader<I, V, E> { | |
90 | ||
91 | /** Reader instance */ | |
92 | private final RecordReader<ImmutableBytesWritable, Result> reader; | |
93 | /** Context passed to initialize */ | |
94 | private TaskAttemptContext context; | |
95 | ||
96 | /** | |
97 | * Sets the base TableInputFormat and creates a record reader. | |
98 | * | |
99 | * @param split InputSplit | |
100 | * @param context Context | |
101 | * @throws IOException | |
102 | */ | |
103 | public HBaseVertexReader(InputSplit split, TaskAttemptContext context) | |
104 | 0 | throws IOException { |
105 | 0 | BASE_FORMAT.setConf(context.getConfiguration()); |
106 | 0 | this.reader = BASE_FORMAT.createRecordReader(split, context); |
107 | 0 | } |
108 | ||
109 | /** | |
110 | * initialize | |
111 | * | |
112 | * @param inputSplit Input split to be used for reading vertices. | |
113 | * @param context Context from the task. | |
114 | * @throws IOException | |
115 | * @throws InterruptedException | |
116 | */ | |
117 | public void initialize(InputSplit inputSplit, | |
118 | TaskAttemptContext context) | |
119 | throws IOException, InterruptedException { | |
120 | 0 | reader.initialize(inputSplit, context); |
121 | 0 | this.context = context; |
122 | 0 | } |
123 | ||
124 | /** | |
125 | * close | |
126 | * @throws IOException | |
127 | */ | |
128 | public void close() throws IOException { | |
129 | 0 | reader.close(); |
130 | 0 | } |
131 | ||
132 | /** | |
133 | * getProgress | |
134 | * | |
135 | * @return progress | |
136 | * @throws IOException | |
137 | * @throws InterruptedException | |
138 | */ | |
139 | public float getProgress() throws | |
140 | IOException, InterruptedException { | |
141 | 0 | return reader.getProgress(); |
142 | } | |
143 | ||
144 | /** | |
145 | * getRecordReader | |
146 | * | |
147 | * @return Record reader to be used for reading. | |
148 | */ | |
149 | protected RecordReader<ImmutableBytesWritable, | |
150 | Result> getRecordReader() { | |
151 | 0 | return reader; |
152 | } | |
153 | ||
154 | /** | |
155 | * getContext | |
156 | * | |
157 | * @return Context passed to initialize. | |
158 | */ | |
159 | protected TaskAttemptContext getContext() { | |
160 | 0 | return context; |
161 | } | |
162 | ||
163 | } | |
164 | ||
165 | @Override | |
166 | public List<InputSplit> getSplits( | |
167 | JobContext context, int minSplitCountHint) | |
168 | throws IOException, InterruptedException { | |
169 | 0 | BASE_FORMAT.setConf(getConf()); |
170 | 0 | return BASE_FORMAT.getSplits(context); |
171 | } | |
172 | } |