Coverage Report - org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput
 
Classes in this File Line Coverage Branch Coverage Complexity
MultiThreadedSuperstepOutput
0%
0/36
0%
0/6
3
MultiThreadedSuperstepOutput$1
0%
0/2
N/A
3
MultiThreadedSuperstepOutput$1$1
0%
0/9
0%
0/2
3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.superstep_output;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.io.SimpleVertexWriter;
 23  
 import org.apache.giraph.io.VertexOutputFormat;
 24  
 import org.apache.giraph.io.VertexWriter;
 25  
 import org.apache.giraph.utils.CallableFactory;
 26  
 import org.apache.giraph.utils.ProgressableUtils;
 27  
 import org.apache.hadoop.io.Writable;
 28  
 import org.apache.hadoop.io.WritableComparable;
 29  
 import org.apache.hadoop.mapreduce.Mapper;
 30  
 
 31  
 import com.google.common.collect.Lists;
 32  
 import com.google.common.collect.Sets;
 33  
 
 34  
 import java.io.IOException;
 35  
 import java.util.List;
 36  
 import java.util.Set;
 37  
 import java.util.concurrent.Callable;
 38  
 
 39  
 /**
 40  
  * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
 41  
  * thread-safe.
 42  
  *
 43  
  * @param <I> Vertex id
 44  
  * @param <V> Vertex value
 45  
  * @param <E> Edge value
 46  
  */
 47  0
 public class MultiThreadedSuperstepOutput<I extends WritableComparable,
 48  
     V extends Writable, E extends Writable> implements
 49  
     SuperstepOutput<I, V, E> {
 50  
   /** Mapper context */
 51  
   private final Mapper<?, ?, ?, ?>.Context context;
 52  
   /** Configuration */
 53  
   private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
 54  
   /** Vertex output format, used to get new vertex writers */
 55  
   private final VertexOutputFormat<I, V, E> vertexOutputFormat;
 56  
   /**
 57  
    * List of returned vertex writers, these can be reused and will all be
 58  
    * closed in the end of the application
 59  
    */
 60  
   private final List<VertexWriter<I, V, E>> availableVertexWriters;
 61  
   /** Vertex writes which were created by this class and are currently used */
 62  
   private final Set<VertexWriter<I, V, E>> occupiedVertexWriters;
 63  
 
 64  
   /**
 65  
    * Constructor
 66  
    *
 67  
    * @param conf    Configuration
 68  
    * @param context Mapper context
 69  
    */
 70  
   public MultiThreadedSuperstepOutput(
 71  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 72  0
       Mapper<?, ?, ?, ?>.Context context) {
 73  0
     this.configuration = conf;
 74  0
     vertexOutputFormat = conf.createWrappedVertexOutputFormat();
 75  0
     this.context = context;
 76  0
     availableVertexWriters = Lists.newArrayList();
 77  0
     occupiedVertexWriters = Sets.newHashSet();
 78  0
     vertexOutputFormat.preWriting(context);
 79  0
   }
 80  
 
 81  
   @Override
 82  
   public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() {
 83  
     VertexWriter<I, V, E> vertexWriter;
 84  0
     if (availableVertexWriters.isEmpty()) {
 85  
       try {
 86  0
         vertexWriter = vertexOutputFormat.createVertexWriter(context);
 87  0
         vertexWriter.setConf(configuration);
 88  0
         vertexWriter.initialize(context);
 89  0
       } catch (IOException e) {
 90  0
         throw new IllegalStateException("getVertexWriter: " +
 91  
             "IOException occurred", e);
 92  0
       } catch (InterruptedException e) {
 93  0
         throw new IllegalStateException("getVertexWriter: " +
 94  
             "InterruptedException occurred", e);
 95  0
       }
 96  
     } else {
 97  0
       vertexWriter =
 98  0
           availableVertexWriters.remove(availableVertexWriters.size() - 1);
 99  
     }
 100  0
     occupiedVertexWriters.add(vertexWriter);
 101  0
     return vertexWriter;
 102  
   }
 103  
 
 104  
   @Override
 105  
   public synchronized void returnVertexWriter(
 106  
       SimpleVertexWriter<I, V, E> vertexWriter) {
 107  0
     VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter;
 108  0
     if (!occupiedVertexWriters.remove(returnedWriter)) {
 109  0
       throw new IllegalStateException("returnVertexWriter: " +
 110  
           "Returned vertex writer which is not currently occupied!");
 111  
     }
 112  0
     availableVertexWriters.add(returnedWriter);
 113  0
   }
 114  
 
 115  
   @Override
 116  
   public synchronized void postApplication() throws IOException,
 117  
       InterruptedException {
 118  0
     if (!occupiedVertexWriters.isEmpty()) {
 119  0
       throw new IllegalStateException("postApplication: " +
 120  0
           occupiedVertexWriters.size() +
 121  
           " vertex writers were not returned!");
 122  
     }
 123  
 
 124  
     // Closing writers can take time - use multiple threads and call progress
 125  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 126  
       @Override
 127  
       public Callable<Void> newCallable(int callableId) {
 128  0
         return new Callable<Void>() {
 129  
           @Override
 130  
           public Void call() throws Exception {
 131  
             while (true) {
 132  
               VertexWriter<I, V, E> vertexWriter;
 133  0
               synchronized (availableVertexWriters) {
 134  0
                 if (availableVertexWriters.isEmpty()) {
 135  0
                   return null;
 136  
                 }
 137  0
                 vertexWriter = availableVertexWriters.remove(
 138  0
                     availableVertexWriters.size() - 1);
 139  0
               }
 140  0
               vertexWriter.close(context);
 141  0
             }
 142  
           }
 143  
         };
 144  
       }
 145  
     };
 146  0
     ProgressableUtils.getResultsWithNCallables(callableFactory,
 147  0
         Math.min(configuration.getNumOutputThreads(),
 148  0
             availableVertexWriters.size()), "close-writers-%d", context);
 149  0
     vertexOutputFormat.postWriting(context);
 150  0
   }
 151  
 }