Coverage Report - org.apache.giraph.aggregators.TextAggregatorWriter
 
Classes in this File Line Coverage Branch Coverage Complexity
TextAggregatorWriter
0%
0/26
0%
0/20
3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.aggregators;
 20  
 
 21  
 import com.google.common.base.Charsets;
 22  
 import java.io.IOException;
 23  
 import java.util.Map.Entry;
 24  
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 25  
 import org.apache.hadoop.fs.FSDataOutputStream;
 26  
 import org.apache.hadoop.fs.FileSystem;
 27  
 import org.apache.hadoop.fs.Path;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.mapreduce.Mapper.Context;
 30  
 
 31  
 /**
 32  
  * Default implementation of {@link AggregatorWriter}. Each line consists of
 33  
  * text and contains the aggregator name, the aggregator value and the
 34  
  * aggregator class.
 35  
  */
 36  0
 public class TextAggregatorWriter
 37  
     extends DefaultImmutableClassesGiraphConfigurable
 38  
     implements AggregatorWriter {
 39  
   /** The filename of the outputfile */
 40  
   public static final String FILENAME =
 41  
       "giraph.textAggregatorWriter.filename";
 42  
   /** Signal for "never write" frequency */
 43  
   public static final int NEVER = 0;
 44  
   /** Signal for "write only the final values" frequency */
 45  
   public static final int AT_THE_END = -1;
 46  
   /** Signal for "write values in every superstep" frequency */
 47  
   public static final int ALWAYS = 1;
 48  
   /** The frequency of writing:
 49  
    *  - NEVER: never write, files aren't created at all
 50  
    *  - AT_THE_END: aggregators are written only when the computation is over
 51  
    *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on
 52  
    */
 53  
   public static final String FREQUENCY =
 54  
       "giraph.textAggregatorWriter.frequency";
 55  
   /** Default filename for dumping aggregator values */
 56  
   private static final String DEFAULT_FILENAME = "aggregatorValues";
 57  
   /** Handle to the outputfile */
 58  
   protected FSDataOutputStream output;
 59  
   /** Write every "frequency" supersteps */
 60  
   private int frequency;
 61  
 
 62  
   @Override
 63  
   @SuppressWarnings("rawtypes")
 64  
   public void initialize(Context context, long attempt) throws IOException {
 65  0
     frequency = getConf().getInt(FREQUENCY, NEVER);
 66  0
     String filename  = getConf().get(FILENAME, DEFAULT_FILENAME);
 67  0
     if (frequency != NEVER) {
 68  0
       Path p = new Path(filename + "_" + attempt);
 69  0
       FileSystem fs = FileSystem.get(getConf());
 70  0
       if (fs.exists(p)) {
 71  0
         throw new RuntimeException("aggregatorWriter file already" +
 72  0
             " exists: " + p.getName());
 73  
       }
 74  0
       output = fs.create(p);
 75  
     }
 76  0
   }
 77  
 
 78  
   @Override
 79  
   public void writeAggregator(
 80  
       Iterable<Entry<String, Writable>> aggregatorMap,
 81  
       long superstep) throws IOException {
 82  0
     if (shouldWrite(superstep)) {
 83  0
       for (Entry<String, Writable> entry : aggregatorMap) {
 84  0
         byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
 85  0
             superstep).getBytes(Charsets.UTF_8);
 86  0
         output.write(bytes, 0, bytes.length);
 87  0
       }
 88  0
       output.flush();
 89  
     }
 90  0
   }
 91  
 
 92  
   /**
 93  
    * Implements the way an aggregator is converted into a String.
 94  
    * Override this if you want to implement your own text format.
 95  
    *
 96  
    * @param aggregatorName Name of the aggregator
 97  
    * @param value Value of aggregator
 98  
    * @param superstep Current superstep
 99  
    * @return The String representation for the aggregator
 100  
    */
 101  
   protected String aggregatorToString(String aggregatorName,
 102  
       Writable value,
 103  
       long superstep) {
 104  0
     return new StringBuilder("superstep=").append(superstep).append("\t")
 105  0
         .append(aggregatorName).append("=").append(value).append("\n")
 106  0
         .toString();
 107  
   }
 108  
 
 109  
   /**
 110  
    * Should write this superstep?
 111  
    *
 112  
    * @param superstep Superstep to check
 113  
    * @return True if should write, false otherwise
 114  
    */
 115  
   private boolean shouldWrite(long superstep) {
 116  0
     return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
 117  
         (frequency != NEVER && frequency != AT_THE_END &&
 118  
             superstep % frequency == 0);
 119  
   }
 120  
 
 121  
   @Override
 122  
   public void close() throws IOException {
 123  0
     if (output != null) {
 124  0
       output.close();
 125  
     }
 126  0
   }
 127  
 }