Coverage Report - org.apache.giraph.io.hcatalog.HCatalogVertexOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
HCatalogVertexOutputFormat
0%
0/8
N/A
1.067
HCatalogVertexOutputFormat$HCatalogVertexWriter
0%
0/9
N/A
1.067
HCatalogVertexOutputFormat$MultiRowHCatalogVertexWriter
0%
0/6
0%
0/2
1.067
HCatalogVertexOutputFormat$SingleRowHCatalogVertexWriter
0%
0/6
N/A
1.067
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.hcatalog;
 20  
 
 21  
 import org.apache.giraph.graph.Vertex;
 22  
 import org.apache.giraph.io.VertexOutputFormat;
 23  
 import org.apache.giraph.io.VertexWriter;
 24  
 import org.apache.hadoop.io.Writable;
 25  
 import org.apache.hadoop.io.WritableComparable;
 26  
 import org.apache.hadoop.mapreduce.JobContext;
 27  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 28  
 import org.apache.hadoop.mapreduce.RecordWriter;
 29  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 30  
 import org.apache.hcatalog.data.DefaultHCatRecord;
 31  
 import org.apache.hcatalog.data.HCatRecord;
 32  
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 33  
 
 34  
 import java.io.IOException;
 35  
 
 36  
 /**
 37  
  * Abstract class that users should subclass to store data to Hive or Pig table.
 38  
  * You can easily implement a {@link HCatalogVertexWriter} by extending
 39  
  * {@link SingleRowHCatalogVertexWriter} or {@link MultiRowHCatalogVertexWriter}
 40  
  * depending on how you want to fit your vertices into the output table.
 41  
  * <p>
 42  
  * The desired database and table name to store to can be specified via
 43  
  * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job,
 44  
  * org.apache.hcatalog.mapreduce.OutputJobInfo)}
 45  
  * as you setup your vertex output format with
 46  
  * {@link org.apache.giraph.conf.GiraphConfiguration}
 47  
  * setVertexOutputFormatClass(Class)}. You must create the output table.
 48  
  *
 49  
  * @param <I> Vertex id
 50  
  * @param <V> Vertex value
 51  
  * @param <E> Edge value
 52  
  */
 53  
 @SuppressWarnings("rawtypes")
 54  0
 public abstract class HCatalogVertexOutputFormat<
 55  
         I extends WritableComparable,
 56  
         V extends Writable,
 57  
         E extends Writable>
 58  
         extends VertexOutputFormat<I, V, E> {
 59  
   /**
 60  
   * hcat output format
 61  
   */
 62  0
   protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
 63  
 
 64  
   @Override
 65  
   public final void checkOutputSpecs(JobContext context) throws IOException,
 66  
       InterruptedException {
 67  0
     hCatOutputFormat.checkOutputSpecs(context);
 68  0
   }
 69  
 
 70  
   @Override
 71  
   public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
 72  
     throws IOException, InterruptedException {
 73  0
     return hCatOutputFormat.getOutputCommitter(context);
 74  
   }
 75  
 
 76  
   /**
 77  
    * Abstract class that users should
 78  
    * subclass based on their specific vertex
 79  
    * output. Users should implement
 80  
    * writeVertex to create a HCatRecord that is
 81  
    * valid to for writing by HCatalogRecordWriter.
 82  
    *
 83  
    * @param <I> Vertex id
 84  
    * @param <V> Vertex value
 85  
    * @param <E> Edge value
 86  
   */
 87  0
   protected abstract static class HCatalogVertexWriter<
 88  
       I extends WritableComparable,
 89  
       V extends Writable,
 90  
       E extends Writable>
 91  
       extends VertexWriter<I, V, E> {
 92  
 
 93  
     /** Internal HCatRecordWriter */
 94  
     private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
 95  
     /** Context passed to initialize */
 96  
     private TaskAttemptContext context;
 97  
 
 98  
     /**
 99  
     * Initialize with the HCatRecordWriter
 100  
     * @param hCatRecordWriter
 101  
     *            Internal writer
 102  
     */
 103  
     protected void initialize(
 104  
                     RecordWriter<WritableComparable<?>,
 105  
                     HCatRecord> hCatRecordWriter) {
 106  0
       this.hCatRecordWriter = hCatRecordWriter;
 107  0
     }
 108  
 
 109  
     /**
 110  
     * Get the record reader.
 111  
     * @return Record reader to be used for reading.
 112  
     */
 113  
     protected RecordWriter<WritableComparable<?>,
 114  
             HCatRecord> getRecordWriter() {
 115  0
       return hCatRecordWriter;
 116  
     }
 117  
 
 118  
     /**
 119  
     * Get the context.
 120  
     *
 121  
     * @return Context passed to initialize.
 122  
     */
 123  
     protected TaskAttemptContext getContext() {
 124  0
       return context;
 125  
     }
 126  
 
 127  
     @Override
 128  
     public void initialize(TaskAttemptContext context) throws IOException {
 129  0
       this.context = context;
 130  0
     }
 131  
 
 132  
     @Override
 133  
     public void close(TaskAttemptContext context) throws IOException,
 134  
         InterruptedException {
 135  0
       hCatRecordWriter.close(context);
 136  0
     }
 137  
 
 138  
   }
 139  
 
 140  
   /**
 141  
   * create vertex writer.
 142  
   * @return HCatalogVertexWriter
 143  
   */
 144  
   protected abstract HCatalogVertexWriter<I, V, E> createVertexWriter();
 145  
 
 146  
   @Override
 147  
   public final VertexWriter<I, V, E> createVertexWriter(
 148  
     TaskAttemptContext context) throws IOException,
 149  
     InterruptedException {
 150  0
     HCatalogVertexWriter<I, V, E>  writer = createVertexWriter();
 151  0
     writer.initialize(hCatOutputFormat.getRecordWriter(context));
 152  0
     return writer;
 153  
   }
 154  
 
 155  
   /**
 156  
    * HCatalogVertexWriter to write each vertex in each row.
 157  
    *
 158  
    * @param <I> Vertex id
 159  
    * @param <V> Vertex value
 160  
    * @param <E> Edge value
 161  
    */
 162  0
   protected abstract static class SingleRowHCatalogVertexWriter<
 163  
       I extends WritableComparable,
 164  
       V extends Writable,
 165  
       E extends Writable>
 166  
       extends HCatalogVertexWriter<I, V, E> {
 167  
     /**
 168  
     * get num columns
 169  
     * @return intcolumns
 170  
     */
 171  
     protected abstract int getNumColumns();
 172  
 
 173  
     /**
 174  
     * fill record
 175  
     * @param record to fill
 176  
     * @param vertex to populate record
 177  
     */
 178  
     protected abstract void fillRecord(HCatRecord record,
 179  
         Vertex<I, V, E> vertex);
 180  
 
 181  
     /**
 182  
     * create record
 183  
     * @param vertex to populate record
 184  
     * @return HCatRecord newly created
 185  
     */
 186  
     protected HCatRecord createRecord(Vertex<I, V, E> vertex) {
 187  0
       HCatRecord record = new DefaultHCatRecord(getNumColumns());
 188  0
       fillRecord(record, vertex);
 189  0
       return record;
 190  
     }
 191  
 
 192  
     @Override
 193  
     public final void writeVertex(Vertex<I, V, E> vertex) throws IOException,
 194  
         InterruptedException {
 195  0
       getRecordWriter().write(null, createRecord(vertex));
 196  0
     }
 197  
 
 198  
   }
 199  
 
 200  
   /**
 201  
    * HCatalogVertexWriter to write each vertex in multiple rows.
 202  
    *
 203  
    * @param <I> Vertex id
 204  
    * @param <V> Vertex value
 205  
    * @param <E> Edge value
 206  
    */
 207  0
   public abstract static class MultiRowHCatalogVertexWriter<
 208  
       I extends WritableComparable,
 209  
       V extends Writable,
 210  
       E extends Writable>
 211  
       extends HCatalogVertexWriter<I, V, E> {
 212  
     /**
 213  
     * create records
 214  
     * @param vertex to populate records
 215  
     * @return Iterable of records
 216  
     */
 217  
     protected abstract Iterable<HCatRecord> createRecords(
 218  
         Vertex<I, V, E> vertex);
 219  
 
 220  
     @Override
 221  
     public final void writeVertex(Vertex<I, V, E> vertex) throws IOException,
 222  
         InterruptedException {
 223  0
       Iterable<HCatRecord> records = createRecords(vertex);
 224  0
       for (HCatRecord record : records) {
 225  0
         getRecordWriter().write(null, record);
 226  0
       }
 227  0
     }
 228  
   }
 229  
 }