Coverage Report - org.apache.giraph.block_app.framework.output.BlockOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
BlockOutputFormat
0%
0/26
0%
0/6
0
BlockOutputFormat$1
0%
0/7
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.output;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.HashMap;
 22  
 import java.util.Map;
 23  
 
 24  
 import org.apache.giraph.bsp.BspOutputFormat;
 25  
 import org.apache.giraph.conf.GiraphConfiguration;
 26  
 import org.apache.giraph.conf.GiraphConstants;
 27  
 import org.apache.giraph.conf.StrConfOption;
 28  
 import org.apache.giraph.utils.ConfigurationObjectUtils;
 29  
 import org.apache.giraph.utils.DefaultOutputCommitter;
 30  
 import org.apache.hadoop.conf.Configuration;
 31  
 import org.apache.hadoop.mapreduce.JobContext;
 32  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 33  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 34  
 
 35  
 /**
 36  
  * Hadoop output format to use with block output.
 37  
  * It keeps track of all registered outputs, and knows how to create them.
 38  
  */
 39  0
 public class BlockOutputFormat extends BspOutputFormat {
 40  0
   private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption(
 41  
       "giraph.outputConfOptions", "",
 42  
       "List of conf options for outputs used");
 43  
 
 44  
   public static <OD> void addOutputDesc(OD outputDesc, String confOption,
 45  
       GiraphConfiguration conf) {
 46  0
     GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf,
 47  
         BlockOutputFormat.class);
 48  0
     String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf);
 49  0
     if (!currentOutputs.isEmpty()) {
 50  0
       currentOutputs = currentOutputs + ",";
 51  
     }
 52  0
     OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption);
 53  0
     ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf);
 54  0
   }
 55  
 
 56  
   /**
 57  
    * Returns an array of output configuration options set in the input
 58  
    * configuration.
 59  
    *
 60  
    * @param conf Configuration
 61  
    * @return Array of options
 62  
    */
 63  
   public static String[] getOutputConfOptions(Configuration conf) {
 64  0
     String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf);
 65  0
     return outputConfOptions.isEmpty() ?
 66  0
         new String[0] : outputConfOptions.split(",");
 67  
   }
 68  
 
 69  
   public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
 70  
   OD createInitAndCheckOutputDesc(String confOption, Configuration conf,
 71  
       String jobIdentifier) {
 72  0
     OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf);
 73  0
     outputDesc.initializeAndCheck(jobIdentifier, conf);
 74  0
     return outputDesc;
 75  
   }
 76  
 
 77  
   public static Map<String, BlockOutputDesc>
 78  
   createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) {
 79  0
     String[] outputConfOptions = getOutputConfOptions(conf);
 80  0
     Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length);
 81  0
     for (String outputConfOption : outputConfOptions) {
 82  0
       ret.put(outputConfOption,
 83  0
           createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier));
 84  
     }
 85  0
     return ret;
 86  
   }
 87  
 
 88  
   public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap(
 89  
       JobContext jobContext) {
 90  0
     return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(),
 91  0
         jobContext.getJobID().toString());
 92  
   }
 93  
 
 94  
   @Override
 95  
   public void checkOutputSpecs(JobContext jobContext)
 96  
       throws IOException, InterruptedException {
 97  0
     createInitAndCheckOutputDescsMap(jobContext);
 98  0
   }
 99  
 
 100  
   @Override
 101  
   public OutputCommitter getOutputCommitter(
 102  
       TaskAttemptContext context) throws IOException, InterruptedException {
 103  0
     return new DefaultOutputCommitter() {
 104  
       @Override
 105  
       public void commit(JobContext jobContext) throws IOException {
 106  0
         Map<String, BlockOutputDesc> map =
 107  0
             createInitAndCheckOutputDescsMap(jobContext);
 108  0
         for (BlockOutputDesc outputDesc : map.values()) {
 109  0
           outputDesc.commit();
 110  0
         }
 111  0
       }
 112  
     };
 113  
   }
 114  
 }