Coverage Report - org.apache.giraph.worker.MappingInputSplitsCallable
 
Classes in this File Line Coverage Branch Coverage Complexity
MappingInputSplitsCallable
0%
0/22
0%
0/2
1.25
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import java.io.IOException;
 22  
 
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.graph.VertexEdgeCount;
 25  
 import org.apache.giraph.io.GiraphInputFormat;
 26  
 import org.apache.giraph.io.MappingInputFormat;
 27  
 import org.apache.giraph.io.MappingReader;
 28  
 import org.apache.giraph.mapping.MappingEntry;
 29  
 import org.apache.giraph.mapping.MappingStore;
 30  
 import org.apache.giraph.io.InputType;
 31  
 import org.apache.hadoop.io.Writable;
 32  
 import org.apache.hadoop.io.WritableComparable;
 33  
 import org.apache.hadoop.mapreduce.InputSplit;
 34  
 import org.apache.hadoop.mapreduce.Mapper;
 35  
 
 36  
 /**
 37  
  * Load as many mapping input splits as possible.
 38  
  * Every thread will has its own instance of WorkerClientRequestProcessor
 39  
  * to send requests.
 40  
  *
 41  
  * @param <I> vertexId type
 42  
  * @param <V> vertexValue type
 43  
  * @param <E> edgeValue type
 44  
  * @param <B> mappingTarget type
 45  
  */
 46  
 @SuppressWarnings("unchecked")
 47  
 public class MappingInputSplitsCallable<I extends WritableComparable,
 48  
   V extends Writable, E extends Writable, B extends Writable>
 49  
   extends InputSplitsCallable<I, V, E> {
 50  
   /** User supplied mappingInputFormat */
 51  
   private final MappingInputFormat<I, V, E, B> mappingInputFormat;
 52  
   /** Link to bspServiceWorker */
 53  
   private final BspServiceWorker<I, V, E> bspServiceWorker;
 54  
 
 55  
   /**
 56  
    * Constructor
 57  
    *
 58  
    * @param mappingInputFormat mappingInputFormat
 59  
    * @param context Context
 60  
    * @param configuration Configuration
 61  
    * @param bspServiceWorker bsp service worker
 62  
    * @param splitsHandler Splits handler
 63  
    */
 64  
   public MappingInputSplitsCallable(
 65  
       MappingInputFormat<I, V, E, B> mappingInputFormat,
 66  
       Mapper<?, ?, ?, ?>.Context context,
 67  
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
 68  
       BspServiceWorker<I, V, E> bspServiceWorker,
 69  
       WorkerInputSplitsHandler splitsHandler) {
 70  0
     super(context, configuration, bspServiceWorker, splitsHandler);
 71  0
     this.mappingInputFormat = mappingInputFormat;
 72  0
     this.bspServiceWorker = bspServiceWorker;
 73  0
   }
 74  
 
 75  
   @Override
 76  
   public GiraphInputFormat getInputFormat() {
 77  0
     return mappingInputFormat;
 78  
   }
 79  
 
 80  
   @Override
 81  
   public InputType getInputType() {
 82  0
     return InputType.MAPPING;
 83  
   }
 84  
 
 85  
   @Override
 86  
   protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
 87  
     throws IOException, InterruptedException {
 88  0
     MappingReader<I, V, E, B> mappingReader =
 89  0
         mappingInputFormat.createMappingReader(inputSplit, context);
 90  0
     mappingReader.setConf(configuration);
 91  
 
 92  0
     WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
 93  0
         .getAggregatorHandler().newThreadAggregatorUsage();
 94  
 
 95  0
     mappingReader.initialize(inputSplit, context);
 96  0
     mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
 97  
 
 98  0
     int entriesLoaded = 0;
 99  0
     MappingStore<I, B> mappingStore =
 100  0
       (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
 101  
 
 102  0
     while (mappingReader.nextEntry()) {
 103  0
       MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
 104  0
       entriesLoaded += 1;
 105  0
       mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
 106  0
     }
 107  0
     return new VertexEdgeCount(0, 0, entriesLoaded);
 108  
   }
 109  
 }