Coverage Report - org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest
 
Classes in this File Line Coverage Branch Coverage Complexity
SendWorkerOneMessageToManyRequest
0%
0/48
0%
0/10
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.requests;
 20  
 
 21  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 import java.util.Map.Entry;
 27  
 
 28  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 29  
 import org.apache.giraph.comm.ServerData;
 30  
 import org.apache.giraph.comm.messages.MessageStore;
 31  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 32  
 import org.apache.giraph.partition.PartitionOwner;
 33  
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 34  
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 35  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 36  
 import org.apache.hadoop.io.Writable;
 37  
 import org.apache.hadoop.io.WritableComparable;
 38  
 
 39  
 /**
 40  
  * Send a collection of ByteArrayOneMessageToManyIds messages to a worker.
 41  
  *
 42  
  * @param <I> Vertex id
 43  
  * @param <M> Message data
 44  
  */
 45  
 @SuppressWarnings("unchecked")
 46  
 public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
 47  
     M extends Writable> extends WritableRequest<I, Writable, Writable>
 48  
     implements WorkerRequest<I, Writable, Writable> {
 49  
   /** ByteArrayOneMessageToManyIds encoding of vertexId &amp; messages */
 50  
   protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds;
 51  
 
 52  
   /**
 53  
    * Constructor used for reflection only.
 54  
    */
 55  0
   public SendWorkerOneMessageToManyRequest() { }
 56  
 
 57  
   /**
 58  
    * Constructor used to send request.
 59  
    *
 60  
    * @param oneMessageToManyIds ByteArrayOneMessageToManyIds
 61  
    * @param conf ImmutableClassesGiraphConfiguration
 62  
    */
 63  
   public SendWorkerOneMessageToManyRequest(
 64  
       ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds,
 65  0
       ImmutableClassesGiraphConfiguration conf) {
 66  0
     this.oneMessageToManyIds = oneMessageToManyIds;
 67  0
     setConf(conf);
 68  0
   }
 69  
 
 70  
   @Override
 71  
   public RequestType getType() {
 72  0
     return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST;
 73  
   }
 74  
 
 75  
   @Override
 76  
   public void readFieldsRequest(DataInput input) throws IOException {
 77  0
     oneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(
 78  0
         getConf().<M>createOutgoingMessageValueFactory());
 79  0
     oneMessageToManyIds.setConf(getConf());
 80  0
     oneMessageToManyIds.readFields(input);
 81  0
   }
 82  
 
 83  
   @Override
 84  
   public void writeRequest(DataOutput output) throws IOException {
 85  0
     this.oneMessageToManyIds.write(output);
 86  0
   }
 87  
 
 88  
   @Override
 89  
   public int getSerializedSize() {
 90  0
     return super.getSerializedSize() +
 91  0
         this.oneMessageToManyIds.getSerializedSize();
 92  
   }
 93  
 
 94  
   @Override
 95  
   public void doRequest(ServerData serverData) {
 96  0
     MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
 97  0
     if (messageStore.isPointerListEncoding()) {
 98  
       // if message store is pointer list based then send data as is
 99  0
       messageStore.addPartitionMessages(-1, oneMessageToManyIds);
 100  
     } else { // else split the data per partition and send individually
 101  0
       CentralizedServiceWorker<I, ?, ?> serviceWorker =
 102  0
           serverData.getServiceWorker();
 103  
       // Get the initial size of ByteArrayVertexIdMessages per partition
 104  
       // on this worker. To make sure every ByteArrayVertexIdMessages to have
 105  
       // enough space to store the messages, we divide the original
 106  
       // ByteArrayOneMessageToManyIds message size by the number of partitions
 107  
       // and double the size
 108  
       // (Assume the major component in ByteArrayOneMessageToManyIds message
 109  
       // is a id list. Now each target id has a copy of message,
 110  
       // therefore we double the buffer size)
 111  
       // to get the initial size of ByteArrayVertexIdMessages.
 112  0
       int initialSize = oneMessageToManyIds.getSize() /
 113  0
           serverData.getPartitionStore().getNumPartitions() * 2;
 114  
       // Create ByteArrayVertexIdMessages for
 115  
       // message reformatting.
 116  0
       Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs =
 117  
           new Int2ObjectOpenHashMap<>();
 118  
 
 119  
       // Put data from ByteArrayOneMessageToManyIds
 120  
       // to ByteArrayVertexIdMessages
 121  0
       VertexIdMessageIterator<I, M> vertexIdMessageIterator =
 122  0
         oneMessageToManyIds.getVertexIdMessageIterator();
 123  0
       while (vertexIdMessageIterator.hasNext()) {
 124  0
         vertexIdMessageIterator.next();
 125  0
         M msg = vertexIdMessageIterator.getCurrentMessage();
 126  0
         I vertexId = vertexIdMessageIterator.getCurrentVertexId();
 127  0
         PartitionOwner owner =
 128  0
             serviceWorker.getVertexPartitionOwner(vertexId);
 129  0
         int partitionId = owner.getPartitionId();
 130  0
         ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs
 131  0
             .get(partitionId);
 132  0
         if (idMsgs == null) {
 133  0
           idMsgs = new ByteArrayVertexIdMessages<>(
 134  0
               getConf().<M>createOutgoingMessageValueFactory());
 135  0
           idMsgs.setConf(getConf());
 136  0
           idMsgs.initialize(initialSize);
 137  0
           partitionIdMsgs.put(partitionId, idMsgs);
 138  
         }
 139  0
         idMsgs.add(vertexId, msg);
 140  0
       }
 141  
 
 142  
       // Read ByteArrayVertexIdMessages and write to message store
 143  
       for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
 144  0
           partitionIdMsgs.entrySet()) {
 145  0
         if (!idMsgs.getValue().isEmpty()) {
 146  0
           serverData.getIncomingMessageStore().addPartitionMessages(
 147  0
               idMsgs.getKey(), idMsgs.getValue());
 148  
         }
 149  0
       }
 150  
     }
 151  0
   }
 152  
 }