Coverage Report - org.apache.giraph.io.internal.WrappedVertexOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
WrappedVertexOutputFormat
0%
0/18
N/A
1
WrappedVertexOutputFormat$1
0%
0/12
N/A
1
WrappedVertexOutputFormat$2
0%
0/24
N/A
1
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.internal;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.graph.Vertex;
 23  
 import org.apache.giraph.io.VertexOutputFormat;
 24  
 import org.apache.giraph.io.VertexWriter;
 25  
 import org.apache.giraph.job.HadoopUtils;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.JobContext;
 29  
 import org.apache.hadoop.mapreduce.JobStatus;
 30  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 31  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 32  
 
 33  
 import java.io.IOException;
 34  
 
 35  
 /**
 36  
  * For internal use only.
 37  
  *
 38  
  * Wraps user set {@link VertexOutputFormat} to make sure proper configuration
 39  
  * parameters are passed around, that user can set parameters in
 40  
  * configuration and they will be available in other methods related to this
 41  
  * format.
 42  
  *
 43  
  * @param <I> Vertex id
 44  
  * @param <V> Vertex data
 45  
  * @param <E> Edge data
 46  
  */
 47  
 public class WrappedVertexOutputFormat<I extends WritableComparable,
 48  
     V extends Writable, E extends Writable>
 49  
     extends VertexOutputFormat<I, V, E> {
 50  
   /** {@link VertexOutputFormat} which is wrapped */
 51  
   private VertexOutputFormat<I, V, E> originalOutputFormat;
 52  
 
 53  
   /**
 54  
    * Constructor
 55  
    *
 56  
    * @param vertexOutputFormat Vertex output format to wrap
 57  
    */
 58  
   public WrappedVertexOutputFormat(
 59  0
       VertexOutputFormat<I, V, E> vertexOutputFormat) {
 60  0
     originalOutputFormat = vertexOutputFormat;
 61  0
   }
 62  
 
 63  
   @Override
 64  
   public VertexWriter<I, V, E> createVertexWriter(
 65  
       TaskAttemptContext context) throws IOException, InterruptedException {
 66  0
     final VertexWriter<I, V, E> vertexWriter =
 67  0
         originalOutputFormat.createVertexWriter(
 68  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 69  0
     return new VertexWriter<I, V, E>() {
 70  
       @Override
 71  
       public void setConf(
 72  
           ImmutableClassesGiraphConfiguration<I, V, E> conf) {
 73  0
         super.setConf(conf);
 74  0
         vertexWriter.setConf(conf);
 75  0
       }
 76  
 
 77  
       @Override
 78  
       public void initialize(
 79  
           TaskAttemptContext context) throws IOException, InterruptedException {
 80  0
         vertexWriter.initialize(
 81  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 82  0
       }
 83  
 
 84  
       @Override
 85  
       public void close(
 86  
           TaskAttemptContext context) throws IOException, InterruptedException {
 87  0
         vertexWriter.close(
 88  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 89  0
       }
 90  
 
 91  
       @Override
 92  
       public void writeVertex(
 93  
           Vertex<I, V, E> vertex) throws IOException, InterruptedException {
 94  0
         vertexWriter.writeVertex(vertex);
 95  0
       }
 96  
     };
 97  
   }
 98  
 
 99  
   @Override
 100  
   public void checkOutputSpecs(
 101  
       JobContext context) throws IOException, InterruptedException {
 102  0
     originalOutputFormat.checkOutputSpecs(
 103  0
         HadoopUtils.makeJobContext(getConf(), context));
 104  0
   }
 105  
 
 106  
   @Override
 107  
   public OutputCommitter getOutputCommitter(
 108  
       TaskAttemptContext context) throws IOException, InterruptedException {
 109  0
     final OutputCommitter outputCommitter =
 110  0
         originalOutputFormat.getOutputCommitter(
 111  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 112  0
     return new OutputCommitter() {
 113  
       @Override
 114  
       public void setupJob(JobContext context) throws IOException {
 115  0
         outputCommitter.setupJob(
 116  0
             HadoopUtils.makeJobContext(getConf(), context));
 117  0
       }
 118  
 
 119  
       @Override
 120  
       public void setupTask(TaskAttemptContext context) throws IOException {
 121  0
         outputCommitter.setupTask(
 122  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 123  0
       }
 124  
 
 125  
       @Override
 126  
       public boolean needsTaskCommit(
 127  
           TaskAttemptContext context) throws IOException {
 128  0
         return outputCommitter.needsTaskCommit(
 129  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 130  
       }
 131  
 
 132  
       @Override
 133  
       public void commitTask(TaskAttemptContext context) throws IOException {
 134  0
         outputCommitter.commitTask(
 135  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 136  0
       }
 137  
 
 138  
       @Override
 139  
       public void abortTask(TaskAttemptContext context) throws IOException {
 140  0
         outputCommitter.abortTask(
 141  0
             HadoopUtils.makeTaskAttemptContext(getConf(), context));
 142  0
       }
 143  
 
 144  
       @Override
 145  
       public void cleanupJob(JobContext context) throws IOException {
 146  0
         outputCommitter.cleanupJob(
 147  0
             HadoopUtils.makeJobContext(getConf(), context));
 148  0
       }
 149  
 
 150  
       @Override
 151  
       public void commitJob(JobContext context) throws IOException {
 152  0
         outputCommitter.commitJob(
 153  0
             HadoopUtils.makeJobContext(getConf(), context));
 154  0
       }
 155  
 
 156  
       @Override
 157  
       public void abortJob(JobContext context,
 158  
           JobStatus.State state) throws IOException {
 159  0
         outputCommitter.abortJob(
 160  0
             HadoopUtils.makeJobContext(getConf(), context), state);
 161  0
       }
 162  
     };
 163  
   }
 164  
 
 165  
   @Override
 166  
   public void preWriting(TaskAttemptContext context) {
 167  0
     originalOutputFormat.preWriting(context);
 168  0
   }
 169  
 
 170  
   @Override
 171  
   public void postWriting(TaskAttemptContext context) {
 172  0
     originalOutputFormat.postWriting(context);
 173  0
   }
 174  
 }