Coverage Report - org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput
 
Classes in this File Line Coverage Branch Coverage Complexity
SynchronizedSuperstepOutput
0%
0/20
N/A
1.8
SynchronizedSuperstepOutput$1
0%
0/3
N/A
1.8
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.superstep_output;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.graph.Vertex;
 23  
 import org.apache.giraph.io.SimpleVertexWriter;
 24  
 import org.apache.giraph.io.VertexWriter;
 25  
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.Mapper;
 29  
 
 30  
 import java.io.IOException;
 31  
 
 32  
 /**
 33  
  * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
 34  
  * not thread-safe.
 35  
  *
 36  
  * @param <I> Vertex id
 37  
  * @param <V> Vertex value
 38  
  * @param <E> Edge value
 39  
  */
 40  0
 public class SynchronizedSuperstepOutput<I extends WritableComparable,
 41  
     V extends Writable, E extends Writable> implements
 42  
     SuperstepOutput<I, V, E> {
 43  
   /** Mapper context */
 44  
   private final Mapper<?, ?, ?, ?>.Context context;
 45  
   /** Main vertex writer */
 46  
   private final VertexWriter<I, V, E> vertexWriter;
 47  
   /** Vertex output format */
 48  
   private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
 49  
   /**
 50  
    * Simple vertex writer, wrapper for {@link #vertexWriter}.
 51  
    * Call to writeVertex is thread-safe.
 52  
    */
 53  
   private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
 54  
 
 55  
   /**
 56  
    * Constructor
 57  
    *
 58  
    * @param conf Configuration
 59  
    * @param context Mapper context
 60  
    */
 61  
   @SuppressWarnings("unchecked")
 62  
   public SynchronizedSuperstepOutput(
 63  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 64  0
       Mapper<?, ?, ?, ?>.Context context) {
 65  0
     this.context = context;
 66  
     try {
 67  0
       vertexOutputFormat = conf.createWrappedVertexOutputFormat();
 68  0
       vertexOutputFormat.preWriting(context);
 69  0
       vertexWriter = vertexOutputFormat.createVertexWriter(context);
 70  0
       vertexWriter.setConf(conf);
 71  0
       vertexWriter.initialize(context);
 72  0
     } catch (IOException e) {
 73  0
       throw new IllegalStateException("SynchronizedSuperstepOutput: " +
 74  
           "IOException occurred", e);
 75  0
     } catch (InterruptedException e) {
 76  0
       throw new IllegalStateException("SynchronizedSuperstepOutput: " +
 77  
           "InterruptedException occurred", e);
 78  0
     }
 79  0
     simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
 80  
       @Override
 81  
       public synchronized void writeVertex(
 82  
           Vertex<I, V, E> vertex) throws IOException, InterruptedException {
 83  0
         vertexWriter.writeVertex(vertex);
 84  0
       }
 85  
     };
 86  0
   }
 87  
 
 88  
   @Override
 89  
   public SimpleVertexWriter<I, V, E> getVertexWriter() {
 90  0
     return simpleVertexWriter;
 91  
   }
 92  
 
 93  
   @Override
 94  
   public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
 95  0
   }
 96  
 
 97  
   @Override
 98  
   public void postApplication() throws IOException, InterruptedException {
 99  0
     vertexWriter.close(context);
 100  0
     vertexOutputFormat.postWriting(context);
 101  0
   }
 102  
 }