Coverage Report - org.apache.giraph.graph.AddressesAndPartitionsWritable
 
Classes in this File Line Coverage Branch Coverage Complexity
AddressesAndPartitionsWritable
0%
0/66
0%
0/24
2.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph;
 20  
 
 21  
 import org.apache.giraph.partition.PartitionOwner;
 22  
 import org.apache.giraph.master.MasterInfo;
 23  
 import org.apache.giraph.utils.ReflectionUtils;
 24  
 import org.apache.giraph.utils.WritableUtils;
 25  
 import org.apache.giraph.worker.WorkerInfo;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 
 28  
 import com.google.common.collect.Iterables;
 29  
 import com.google.common.collect.Lists;
 30  
 import com.google.common.collect.Maps;
 31  
 
 32  
 import java.io.DataInput;
 33  
 import java.io.DataOutput;
 34  
 import java.io.IOException;
 35  
 import java.util.Collection;
 36  
 import java.util.List;
 37  
 import java.util.Map;
 38  
 
 39  
 /**
 40  
  * Helper class to write descriptions of master, workers and partition owners
 41  
  */
 42  
 public class AddressesAndPartitionsWritable implements Writable {
 43  
   /** Master information */
 44  
   private MasterInfo masterInfo;
 45  
   /** List of all workers */
 46  
   private List<WorkerInfo> workerInfos;
 47  
   /** Collection of partitions */
 48  
   private Collection<PartitionOwner> partitionOwners;
 49  
 
 50  
   /**
 51  
    * Constructor when we want to serialize object
 52  
    *
 53  
    * @param masterInfo Master information
 54  
    * @param workerInfos List of all workers
 55  
    * @param partitionOwners Collection of partitions
 56  
    */
 57  
   public AddressesAndPartitionsWritable(MasterInfo masterInfo,
 58  
       List<WorkerInfo> workerInfos,
 59  0
       Collection<PartitionOwner> partitionOwners) {
 60  0
     this.masterInfo = masterInfo;
 61  0
     this.workerInfos = workerInfos;
 62  0
     this.partitionOwners = partitionOwners;
 63  0
   }
 64  
 
 65  
   /** Constructor for reflection */
 66  0
   public AddressesAndPartitionsWritable() {
 67  0
   }
 68  
 
 69  
   /**
 70  
    * Get master information
 71  
    *
 72  
    * @return Master information
 73  
    */
 74  
   public MasterInfo getMasterInfo() {
 75  0
     return masterInfo;
 76  
   }
 77  
 
 78  
   /**
 79  
    * Get all workers
 80  
    *
 81  
    * @return List of all workers
 82  
    */
 83  
   public List<WorkerInfo> getWorkerInfos() {
 84  0
     return workerInfos;
 85  
   }
 86  
 
 87  
   /**
 88  
    * Get partition owners
 89  
    *
 90  
    * @return Collection of partition owners
 91  
    */
 92  
   public Collection<PartitionOwner> getPartitionOwners() {
 93  0
     return partitionOwners;
 94  
   }
 95  
 
 96  
   @Override
 97  
   public void write(DataOutput output) throws IOException {
 98  0
     masterInfo.write(output);
 99  
 
 100  0
     output.writeInt(workerInfos.size());
 101  0
     for (WorkerInfo workerInfo : workerInfos) {
 102  0
       workerInfo.write(output);
 103  0
     }
 104  
 
 105  0
     Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
 106  
     // Also write out the previous worker information that are used
 107  
     // in the partition owners
 108  0
     List<WorkerInfo> previousWorkerInfos = Lists.newArrayList();
 109  0
     for (PartitionOwner partitionOwner : partitionOwners) {
 110  0
       if (partitionOwner.getPreviousWorkerInfo() != null) {
 111  0
         if (!workerInfoMap.containsKey(
 112  0
             partitionOwner.getPreviousWorkerInfo().getTaskId())) {
 113  0
           previousWorkerInfos.add(partitionOwner.getPreviousWorkerInfo());
 114  
         }
 115  
       }
 116  0
     }
 117  0
     output.writeInt(previousWorkerInfos.size());
 118  0
     for (WorkerInfo workerInfo : previousWorkerInfos) {
 119  0
       workerInfo.write(output);
 120  0
     }
 121  
 
 122  0
     output.writeInt(partitionOwners.size());
 123  0
     if (partitionOwners.size() > 0) {
 124  0
       WritableUtils.writeClass(
 125  0
           partitionOwners.iterator().next().getClass(), output);
 126  
     }
 127  0
     for (PartitionOwner partitionOwner : partitionOwners) {
 128  0
       partitionOwner.writeWithWorkerIds(output);
 129  0
     }
 130  0
   }
 131  
 
 132  
   @Override
 133  
   public void readFields(DataInput input) throws IOException {
 134  0
     masterInfo = new MasterInfo();
 135  0
     masterInfo.readFields(input);
 136  
 
 137  0
     int workerInfosSize = input.readInt();
 138  0
     workerInfos = Lists.newArrayListWithCapacity(workerInfosSize);
 139  0
     for (int i = 0; i < workerInfosSize; i++) {
 140  0
       WorkerInfo workerInfo = new WorkerInfo();
 141  0
       workerInfo.readFields(input);
 142  0
       workerInfos.add(workerInfo);
 143  
     }
 144  
 
 145  0
     Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
 146  0
     int additionalWorkerInfos = input.readInt();
 147  0
     for (int i = 0; i < additionalWorkerInfos; i++) {
 148  0
       WorkerInfo workerInfo = new WorkerInfo();
 149  0
       workerInfo.readFields(input);
 150  0
       workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
 151  
     }
 152  
 
 153  0
     int partitionOwnersSize = input.readInt();
 154  0
     Class<PartitionOwner> partitionOwnerClass = null;
 155  0
     if (partitionOwnersSize > 0) {
 156  0
       partitionOwnerClass = WritableUtils.readClass(input);
 157  
     }
 158  0
     partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
 159  0
     for (int i = 0; i < partitionOwnersSize; i++) {
 160  0
       PartitionOwner partitionOwner =
 161  0
           ReflectionUtils.newInstance(partitionOwnerClass);
 162  0
       partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
 163  0
       partitionOwners.add(partitionOwner);
 164  
     }
 165  0
   }
 166  
 
 167  
   /**
 168  
    * Convert Iterable of WorkerInfos to the map from task id to WorkerInfo.
 169  
    *
 170  
    * @param workerInfos Iterable of WorkerInfos
 171  
    * @return The map from task id to WorkerInfo
 172  
    */
 173  
   private static Map<Integer, WorkerInfo> getAsWorkerInfoMap(
 174  
       Iterable<WorkerInfo> workerInfos) {
 175  0
     Map<Integer, WorkerInfo> workerInfoMap =
 176  0
         Maps.newHashMapWithExpectedSize(Iterables.size(workerInfos));
 177  0
     for (WorkerInfo workerInfo : workerInfos) {
 178  0
       workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
 179  0
     }
 180  0
     return workerInfoMap;
 181  
   }
 182  
 }