Coverage Report - org.apache.giraph.io.hcatalog.GiraphHCatInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphHCatInputFormat
0%
0/122
0%
0/40
2.6
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.hcatalog;
 20  
 
 21  
 import org.apache.hadoop.conf.Configuration;
 22  
 import org.apache.hadoop.fs.FileSystem;
 23  
 import org.apache.hadoop.fs.Path;
 24  
 import org.apache.hadoop.io.WritableComparable;
 25  
 import org.apache.hadoop.mapred.JobConf;
 26  
 import org.apache.hadoop.mapreduce.InputSplit;
 27  
 import org.apache.hadoop.mapreduce.Job;
 28  
 import org.apache.hadoop.mapreduce.JobContext;
 29  
 import org.apache.hadoop.mapreduce.RecordReader;
 30  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 31  
 import org.apache.hadoop.util.StringUtils;
 32  
 import org.apache.hcatalog.common.HCatConstants;
 33  
 import org.apache.hcatalog.common.HCatUtil;
 34  
 import org.apache.hcatalog.data.HCatRecord;
 35  
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 36  
 import org.apache.hcatalog.data.schema.HCatSchema;
 37  
 import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
 38  
 import org.apache.hcatalog.mapreduce.HCatSplit;
 39  
 import org.apache.hcatalog.mapreduce.HCatStorageHandler;
 40  
 import org.apache.hcatalog.mapreduce.HCatUtils;
 41  
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 42  
 import org.apache.hcatalog.mapreduce.PartInfo;
 43  
 
 44  
 import java.io.IOException;
 45  
 import java.util.ArrayList;
 46  
 import java.util.HashMap;
 47  
 import java.util.LinkedList;
 48  
 import java.util.List;
 49  
 import java.util.Map;
 50  
 
 51  
 /**
 52  
  * Provides functionality similar to
 53  
  * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
 54  
  * but allows for different data sources (vertex and edge data).
 55  
  */
 56  0
 public class GiraphHCatInputFormat extends HCatBaseInputFormat {
 57  
   /** Vertex input job info for HCatalog. */
 58  
   public static final String VERTEX_INPUT_JOB_INFO =
 59  
       "giraph.hcat.vertex.input.job.info";
 60  
   /** Edge input job info for HCatalog. */
 61  
   public static final String EDGE_INPUT_JOB_INFO =
 62  
       "giraph.hcat.edge.input.job.info";
 63  
 
 64  
   /**
 65  
    * Set vertex {@link InputJobInfo}.
 66  
    *
 67  
    * @param job The job
 68  
    * @param inputJobInfo Vertex input job info
 69  
    * @throws IOException
 70  
    */
 71  
   public static void setVertexInput(Job job,
 72  
                                     InputJobInfo inputJobInfo)
 73  
     throws IOException {
 74  0
     InputJobInfo vertexInputJobInfo = InputJobInfo.create(
 75  0
         inputJobInfo.getDatabaseName(),
 76  0
         inputJobInfo.getTableName(),
 77  0
         inputJobInfo.getFilter());
 78  0
     vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
 79  0
     Configuration conf = job.getConfiguration();
 80  0
     conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
 81  0
         HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
 82  0
   }
 83  
 
 84  
   /**
 85  
    * Set edge {@link InputJobInfo}.
 86  
    *
 87  
    * @param job The job
 88  
    * @param inputJobInfo Edge input job info
 89  
    * @throws IOException
 90  
    */
 91  
   public static void setEdgeInput(Job job,
 92  
                                   InputJobInfo inputJobInfo)
 93  
     throws IOException {
 94  0
     InputJobInfo edgeInputJobInfo = InputJobInfo.create(
 95  0
         inputJobInfo.getDatabaseName(),
 96  0
         inputJobInfo.getTableName(),
 97  0
         inputJobInfo.getFilter());
 98  0
     edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
 99  0
     Configuration conf = job.getConfiguration();
 100  0
     conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
 101  0
         HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
 102  0
   }
 103  
 
 104  
   /**
 105  
    * Get table schema from input job info.
 106  
    *
 107  
    * @param inputJobInfo Input job info
 108  
    * @return Input table schema
 109  
    * @throws IOException
 110  
    */
 111  
   private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
 112  
     throws IOException {
 113  0
     HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
 114  
     for (HCatFieldSchema field :
 115  0
         inputJobInfo.getTableInfo().getDataColumns().getFields()) {
 116  0
       allCols.append(field);
 117  0
     }
 118  
     for (HCatFieldSchema field :
 119  0
         inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
 120  0
       allCols.append(field);
 121  0
     }
 122  0
     return allCols;
 123  
   }
 124  
 
 125  
   /**
 126  
    * Get vertex input table schema.
 127  
    *
 128  
    * @param conf Job configuration
 129  
    * @return Vertex input table schema
 130  
    * @throws IOException
 131  
    */
 132  
   public static HCatSchema getVertexTableSchema(Configuration conf)
 133  
     throws IOException {
 134  0
     return getTableSchema(getVertexJobInfo(conf));
 135  
   }
 136  
 
 137  
   /**
 138  
    * Get edge input table schema.
 139  
    *
 140  
    * @param conf Job configuration
 141  
    * @return Edge input table schema
 142  
    * @throws IOException
 143  
    */
 144  
   public static HCatSchema getEdgeTableSchema(Configuration conf)
 145  
     throws IOException {
 146  0
     return getTableSchema(getEdgeJobInfo(conf));
 147  
   }
 148  
 
 149  
   /**
 150  
    * Set input path for job.
 151  
    *
 152  
    * @param jobConf Job configuration
 153  
    * @param location Location of input files
 154  
    * @throws IOException
 155  
    */
 156  
   private void setInputPath(JobConf jobConf, String location)
 157  
     throws IOException {
 158  0
     int length = location.length();
 159  0
     int curlyOpen = 0;
 160  0
     int pathStart = 0;
 161  0
     boolean globPattern = false;
 162  0
     List<String> pathStrings = new ArrayList<String>();
 163  
 
 164  0
     for (int i = 0; i < length; i++) {
 165  0
       char ch = location.charAt(i);
 166  0
       switch (ch) {
 167  
       case '{':
 168  0
         curlyOpen++;
 169  0
         if (!globPattern) {
 170  0
           globPattern = true;
 171  
         }
 172  
         break;
 173  
       case '}':
 174  0
         curlyOpen--;
 175  0
         if (curlyOpen == 0 && globPattern) {
 176  0
           globPattern = false;
 177  
         }
 178  
         break;
 179  
       case ',':
 180  0
         if (!globPattern) {
 181  0
           pathStrings.add(location.substring(pathStart, i));
 182  0
           pathStart = i + 1;
 183  
         }
 184  
         break;
 185  
       default:
 186  
       }
 187  
     }
 188  0
     pathStrings.add(location.substring(pathStart, length));
 189  
 
 190  0
     Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
 191  
 
 192  0
     FileSystem fs = FileSystem.get(jobConf);
 193  0
     Path path = paths[0].makeQualified(fs);
 194  0
     StringBuilder str = new StringBuilder(StringUtils.escapeString(
 195  0
         path.toString()));
 196  0
     for (int i = 1; i < paths.length; i++) {
 197  0
       str.append(StringUtils.COMMA_STR);
 198  0
       path = paths[i].makeQualified(fs);
 199  0
       str.append(StringUtils.escapeString(path.toString()));
 200  
     }
 201  
 
 202  0
     jobConf.set("mapred.input.dir", str.toString());
 203  0
   }
 204  
 
 205  
   /**
 206  
    * Get input splits for job.
 207  
    *
 208  
    * @param jobContext Job context
 209  
    * @param inputJobInfo Input job info
 210  
    * @return MapReduce setting for file input directory
 211  
    * @throws IOException
 212  
    * @throws InterruptedException
 213  
    */
 214  
   private List<InputSplit> getSplits(JobContext jobContext,
 215  
                                      InputJobInfo inputJobInfo)
 216  
     throws IOException, InterruptedException {
 217  0
     Configuration conf = jobContext.getConfiguration();
 218  
 
 219  0
     List<InputSplit> splits = new ArrayList<InputSplit>();
 220  0
     List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
 221  0
     if (partitionInfoList == null) {
 222  
       //No partitions match the specified partition filter
 223  0
       return splits;
 224  
     }
 225  
 
 226  
     HCatStorageHandler storageHandler;
 227  
     JobConf jobConf;
 228  
     //For each matching partition, call getSplits on the underlying InputFormat
 229  0
     for (PartInfo partitionInfo : partitionInfoList) {
 230  0
       jobConf = HCatUtil.getJobConfFromContext(jobContext);
 231  0
       setInputPath(jobConf, partitionInfo.getLocation());
 232  0
       Map<String, String> jobProperties = partitionInfo.getJobProperties();
 233  
 
 234  0
       HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
 235  
       for (HCatFieldSchema field :
 236  0
           inputJobInfo.getTableInfo().getDataColumns().getFields()) {
 237  0
         allCols.append(field);
 238  0
       }
 239  
       for (HCatFieldSchema field :
 240  0
           inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
 241  0
         allCols.append(field);
 242  0
       }
 243  
 
 244  0
       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
 245  
 
 246  0
       storageHandler = HCatUtil.getStorageHandler(
 247  
           jobConf, partitionInfo);
 248  
 
 249  
       //Get the input format
 250  0
       Class inputFormatClass = storageHandler.getInputFormatClass();
 251  0
       org.apache.hadoop.mapred.InputFormat inputFormat =
 252  0
           getMapRedInputFormat(jobConf, inputFormatClass);
 253  
 
 254  
       //Call getSplit on the InputFormat, create an HCatSplit for each
 255  
       //underlying split. When the desired number of input splits is missing,
 256  
       //use a default number (denoted by zero).
 257  
       //TODO: Currently each partition is split independently into
 258  
       //a desired number. However, we want the union of all partitions to be
 259  
       //split into a desired number while maintaining balanced sizes of input
 260  
       //splits.
 261  0
       int desiredNumSplits =
 262  0
           conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
 263  0
       org.apache.hadoop.mapred.InputSplit[] baseSplits =
 264  0
           inputFormat.getSplits(jobConf, desiredNumSplits);
 265  
 
 266  0
       for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
 267  0
         splits.add(new HCatSplit(partitionInfo, split, allCols));
 268  
       }
 269  0
     }
 270  
 
 271  0
     return splits;
 272  
   }
 273  
 
 274  
   /**
 275  
    * Get vertex {@link InputJobInfo}.
 276  
    *
 277  
    * @param conf Configuration
 278  
    * @return Vertex input job info
 279  
    * @throws IOException
 280  
    */
 281  
   private static InputJobInfo getVertexJobInfo(Configuration conf)
 282  
     throws IOException {
 283  0
     String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
 284  0
     if (jobString == null) {
 285  0
       throw new IOException("Vertex job information not found in JobContext." +
 286  
           " GiraphHCatInputFormat.setVertexInput() not called?");
 287  
     }
 288  0
     return (InputJobInfo) HCatUtil.deserialize(jobString);
 289  
   }
 290  
 
 291  
   /**
 292  
    * Get edge {@link InputJobInfo}.
 293  
    *
 294  
    * @param conf Configuration
 295  
    * @return Edge input job info
 296  
    * @throws IOException
 297  
    */
 298  
   private static InputJobInfo getEdgeJobInfo(Configuration conf)
 299  
     throws IOException {
 300  0
     String jobString = conf.get(EDGE_INPUT_JOB_INFO);
 301  0
     if (jobString == null) {
 302  0
       throw new IOException("Edge job information not found in JobContext." +
 303  
           " GiraphHCatInputFormat.setEdgeInput() not called?");
 304  
     }
 305  0
     return (InputJobInfo) HCatUtil.deserialize(jobString);
 306  
   }
 307  
 
 308  
   /**
 309  
    * Get vertex input splits.
 310  
    *
 311  
    * @param jobContext Job context
 312  
    * @return List of vertex {@link InputSplit}s
 313  
    * @throws IOException
 314  
    * @throws InterruptedException
 315  
    */
 316  
   public List<InputSplit> getVertexSplits(JobContext jobContext)
 317  
     throws IOException, InterruptedException {
 318  0
     return getSplits(jobContext,
 319  0
         getVertexJobInfo(jobContext.getConfiguration()));
 320  
   }
 321  
 
 322  
   /**
 323  
    * Get edge input splits.
 324  
    *
 325  
    * @param jobContext Job context
 326  
    * @return List of edge {@link InputSplit}s
 327  
    * @throws IOException
 328  
    * @throws InterruptedException
 329  
    */
 330  
   public List<InputSplit> getEdgeSplits(JobContext jobContext)
 331  
     throws IOException, InterruptedException {
 332  0
     return getSplits(jobContext,
 333  0
         getEdgeJobInfo(jobContext.getConfiguration()));
 334  
   }
 335  
 
 336  
   /**
 337  
    * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
 338  
    *
 339  
    * @param split Input split
 340  
    * @param schema Table schema
 341  
    * @param taskContext Context
 342  
    * @return Record reader
 343  
    * @throws IOException
 344  
    * @throws InterruptedException
 345  
    */
 346  
   private RecordReader<WritableComparable, HCatRecord>
 347  
   createRecordReader(InputSplit split,
 348  
                      HCatSchema schema,
 349  
                      TaskAttemptContext taskContext)
 350  
     throws IOException, InterruptedException {
 351  0
     HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
 352  0
     PartInfo partitionInfo = hcatSplit.getPartitionInfo();
 353  0
     JobContext jobContext = taskContext;
 354  0
     Configuration conf = jobContext.getConfiguration();
 355  
 
 356  0
     HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
 357  
         conf, partitionInfo);
 358  
 
 359  0
     JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
 360  0
     Map<String, String> jobProperties = partitionInfo.getJobProperties();
 361  0
     HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
 362  
 
 363  0
     Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
 364  
         schema, partitionInfo);
 365  
 
 366  0
     return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
 367  
   }
 368  
 
 369  
   /**
 370  
    * Create a {@link RecordReader} for vertices.
 371  
    *
 372  
    * @param split Input split
 373  
    * @param taskContext Context
 374  
    * @return Record reader
 375  
    * @throws IOException
 376  
    * @throws InterruptedException
 377  
    */
 378  
   public RecordReader<WritableComparable, HCatRecord>
 379  
   createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
 380  
     throws IOException, InterruptedException {
 381  0
     return createRecordReader(split, getVertexTableSchema(
 382  0
         taskContext.getConfiguration()), taskContext);
 383  
   }
 384  
 
 385  
   /**
 386  
    * Create a {@link RecordReader} for edges.
 387  
    *
 388  
    * @param split Input split
 389  
    * @param taskContext Context
 390  
    * @return Record reader
 391  
    * @throws IOException
 392  
    * @throws InterruptedException
 393  
    */
 394  
   public RecordReader<WritableComparable, HCatRecord>
 395  
   createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
 396  
     throws IOException, InterruptedException {
 397  0
     return createRecordReader(split, getEdgeTableSchema(
 398  0
         taskContext.getConfiguration()), taskContext);
 399  
   }
 400  
 
 401  
   /**
 402  
    * Get values for fields requested by output schema which will not be in the
 403  
    * data.
 404  
    *
 405  
    * @param outputSchema Output schema
 406  
    * @param partInfo Partition info
 407  
    * @return Values not in data columns
 408  
    */
 409  
   private static Map<String, String> getColValsNotInDataColumns(
 410  
       HCatSchema outputSchema,
 411  
       PartInfo partInfo) {
 412  0
     HCatSchema dataSchema = partInfo.getPartitionSchema();
 413  0
     Map<String, String> vals = new HashMap<String, String>();
 414  0
     for (String fieldName : outputSchema.getFieldNames()) {
 415  0
       if (dataSchema.getPosition(fieldName) == null) {
 416  
         // this entry of output is not present in the output schema
 417  
         // so, we first check the table schema to see if it is a part col
 418  0
         if (partInfo.getPartitionValues().containsKey(fieldName)) {
 419  0
           vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
 420  
         } else {
 421  0
           vals.put(fieldName, null);
 422  
         }
 423  
       }
 424  0
     }
 425  0
     return vals;
 426  
   }
 427  
 }