Coverage Report - org.apache.giraph.comm.SendMessageCache
 
Classes in this File Line Coverage Branch Coverage Complexity
SendMessageCache
0%
0/55
0%
0/8
1.312
SendMessageCache$TargetVertexIdIterator
0%
0/8
N/A
1.312
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
 22  
 import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
 23  
 
 24  
 import java.util.Iterator;
 25  
 
 26  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 27  
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 28  
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 29  
 import org.apache.giraph.comm.requests.WritableRequest;
 30  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 31  
 import org.apache.giraph.edge.Edge;
 32  
 import org.apache.giraph.factories.MessageValueFactory;
 33  
 import org.apache.giraph.graph.Vertex;
 34  
 import org.apache.giraph.partition.PartitionOwner;
 35  
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 36  
 import org.apache.giraph.utils.PairList;
 37  
 import org.apache.giraph.utils.VertexIdMessages;
 38  
 import org.apache.giraph.worker.WorkerInfo;
 39  
 import org.apache.hadoop.io.Writable;
 40  
 import org.apache.hadoop.io.WritableComparable;
 41  
 import org.apache.log4j.Logger;
 42  
 
 43  
 /**
 44  
  * Aggregates the messages to be sent to workers so they can be sent
 45  
  * in bulk.  Not thread-safe.
 46  
  *
 47  
  * @param <I> Vertex id
 48  
  * @param <M> Message data
 49  
  */
 50  0
 @SuppressWarnings("unchecked")
 51  
 public class SendMessageCache<I extends WritableComparable, M extends Writable>
 52  
     extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
 53  
   /** Class logger */
 54  0
   private static final Logger LOG =
 55  0
       Logger.getLogger(SendMessageCache.class);
 56  
   /** Messages sent during the last superstep */
 57  0
   protected long totalMsgsSentInSuperstep = 0;
 58  
   /** Message bytes sent during the last superstep */
 59  0
   protected long totalMsgBytesSentInSuperstep = 0;
 60  
   /** Max message size sent to a worker */
 61  
   protected final int maxMessagesSizePerWorker;
 62  
   /** NettyWorkerClientRequestProcessor for message sending */
 63  
   protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
 64  
   /** Cached message value factory */
 65  
   protected MessageValueFactory<M> messageValueFactory;
 66  
   /**
 67  
    * Constructor
 68  
    *
 69  
    * @param conf Giraph configuration
 70  
    * @param serviceWorker Service worker
 71  
    * @param processor NettyWorkerClientRequestProcessor
 72  
    * @param maxMsgSize Max message size sent to a worker
 73  
    */
 74  
   public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
 75  
       CentralizedServiceWorker<?, ?, ?> serviceWorker,
 76  
       NettyWorkerClientRequestProcessor<I, ?, ?> processor,
 77  
       int maxMsgSize) {
 78  0
     super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
 79  0
         ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
 80  0
     maxMessagesSizePerWorker = maxMsgSize;
 81  0
     clientProcessor = processor;
 82  0
     messageValueFactory =
 83  0
             conf.createOutgoingMessageValueFactory();
 84  0
   }
 85  
 
 86  
   @Override
 87  
   public VertexIdMessages<I, M> createVertexIdData() {
 88  0
     return new ByteArrayVertexIdMessages<I, M>(messageValueFactory);
 89  
   }
 90  
 
 91  
   /**
 92  
    * Add a message to the cache.
 93  
    *
 94  
    * @param workerInfo the remote worker destination
 95  
    * @param partitionId the remote Partition this message belongs to
 96  
    * @param destVertexId vertex id that is ultimate destination
 97  
    * @param message Message to send to remote worker
 98  
    * @return Size of messages for the worker.
 99  
    */
 100  
   public int addMessage(WorkerInfo workerInfo,
 101  
                         int partitionId, I destVertexId, M message) {
 102  0
     return addData(workerInfo, partitionId, destVertexId, message);
 103  
   }
 104  
 
 105  
   /**
 106  
    * Add a message to the cache with serialized ids.
 107  
    *
 108  
    * @param workerInfo The remote worker destination
 109  
    * @param partitionId The remote Partition this message belongs to
 110  
    * @param serializedId Serialized vertex id that is ultimate destination
 111  
    * @param idSerializerPos The end position of serialized id in the byte array
 112  
    * @param message Message to send to remote worker
 113  
    * @return Size of messages for the worker.
 114  
    */
 115  
   protected int addMessage(WorkerInfo workerInfo, int partitionId,
 116  
       byte[] serializedId, int idSerializerPos, M message) {
 117  0
     return addData(
 118  
       workerInfo, partitionId, serializedId,
 119  
       idSerializerPos, message);
 120  
   }
 121  
 
 122  
   /**
 123  
    * Gets the messages for a worker and removes it from the cache.
 124  
    *
 125  
    * @param workerInfo the address of the worker who owns the data
 126  
    *                   partitions that are receiving the messages
 127  
    * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
 128  
    *         where all partition ids belong to workerInfo
 129  
    */
 130  
   protected PairList<Integer, VertexIdMessages<I, M>>
 131  
   removeWorkerMessages(WorkerInfo workerInfo) {
 132  0
     return removeWorkerData(workerInfo);
 133  
   }
 134  
 
 135  
   /**
 136  
    * Gets all the messages and removes them from the cache.
 137  
    *
 138  
    * @return All vertex messages for all partitions
 139  
    */
 140  
   private PairList<WorkerInfo, PairList<
 141  
       Integer, VertexIdMessages<I, M>>> removeAllMessages() {
 142  0
     return removeAllData();
 143  
   }
 144  
 
 145  
   /**
 146  
    * Send a message to a target vertex id.
 147  
    *
 148  
    * @param destVertexId Target vertex id
 149  
    * @param message The message sent to the target
 150  
    */
 151  
   public void sendMessageRequest(I destVertexId, M message) {
 152  0
     PartitionOwner owner =
 153  0
       getServiceWorker().getVertexPartitionOwner(destVertexId);
 154  0
     WorkerInfo workerInfo = owner.getWorkerInfo();
 155  0
     final int partitionId = owner.getPartitionId();
 156  0
     if (LOG.isTraceEnabled()) {
 157  0
       LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
 158  
         ") to " + destVertexId + " on worker " + workerInfo);
 159  
     }
 160  0
     ++totalMsgsSentInSuperstep;
 161  
     // Add the message to the cache
 162  0
     int workerMessageSize = addMessage(
 163  
       workerInfo, partitionId, destVertexId, message);
 164  
     // Send a request if the cache of outgoing message to
 165  
     // the remote worker 'workerInfo' is full enough to be flushed
 166  0
     if (workerMessageSize >= maxMessagesSizePerWorker) {
 167  
       PairList<Integer, VertexIdMessages<I, M>>
 168  0
         workerMessages = removeWorkerMessages(workerInfo);
 169  0
       WritableRequest writableRequest =
 170  
         new SendWorkerMessagesRequest<I, M>(workerMessages);
 171  0
       totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
 172  0
       clientProcessor.doRequest(workerInfo, writableRequest);
 173  
       // Notify sending
 174  0
       getServiceWorker().getGraphTaskManager().notifySentMessages();
 175  
     }
 176  0
   }
 177  
 
 178  
   /**
 179  
    * An iterator wrapper on edges to return
 180  
    * target vertex ids.
 181  
    */
 182  0
   public static class TargetVertexIdIterator<I extends WritableComparable>
 183  
       implements Iterator<I> {
 184  
     /** An edge iterator */
 185  
     private final Iterator<Edge<I, Writable>> edgesIterator;
 186  
 
 187  
     /**
 188  
      * Constructor.
 189  
      *
 190  
      * @param vertex The source vertex of the out edges
 191  
      */
 192  0
     public TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
 193  0
       edgesIterator =
 194  0
         ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
 195  0
     }
 196  
 
 197  
     @Override
 198  
     public boolean hasNext() {
 199  0
       return edgesIterator.hasNext();
 200  
     }
 201  
 
 202  
     @Override
 203  
     public I next() {
 204  0
       return edgesIterator.next().getTargetVertexId();
 205  
     }
 206  
 
 207  
     @Override
 208  
     public void remove() {
 209  0
       throw new UnsupportedOperationException();
 210  
     }
 211  
   }
 212  
 
 213  
   /**
 214  
    * Send message to all its neighbors
 215  
    *
 216  
    * @param vertex The source vertex
 217  
    * @param message The message sent to a worker
 218  
    */
 219  
   public void sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M message) {
 220  0
     TargetVertexIdIterator targetVertexIterator =
 221  
       new TargetVertexIdIterator(vertex);
 222  0
     sendMessageToAllRequest(targetVertexIterator, message);
 223  0
   }
 224  
 
 225  
   /**
 226  
    * Send message to the target ids in the iterator
 227  
    *
 228  
    * @param vertexIdIterator The iterator of target vertex ids
 229  
    * @param message The message sent to a worker
 230  
    */
 231  
   public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
 232  0
     while (vertexIdIterator.hasNext()) {
 233  0
       sendMessageRequest(vertexIdIterator.next(), message);
 234  
     }
 235  0
   }
 236  
 
 237  
   /**
 238  
    * Flush the rest of the messages to the workers.
 239  
    */
 240  
   public void flush() {
 241  
     PairList<WorkerInfo, PairList<Integer,
 242  
         VertexIdMessages<I, M>>>
 243  0
     remainingMessageCache = removeAllMessages();
 244  
     PairList<WorkerInfo, PairList<
 245  
         Integer, VertexIdMessages<I, M>>>.Iterator
 246  0
     iterator = remainingMessageCache.getIterator();
 247  0
     while (iterator.hasNext()) {
 248  0
       iterator.next();
 249  0
       WritableRequest writableRequest =
 250  
         new SendWorkerMessagesRequest<I, M>(
 251  0
           iterator.getCurrentSecond());
 252  0
       totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
 253  0
       clientProcessor.doRequest(
 254  0
         iterator.getCurrentFirst(), writableRequest);
 255  0
     }
 256  0
   }
 257  
 
 258  
   /**
 259  
    * Reset the message count per superstep.
 260  
    *
 261  
    * @return The message count sent in last superstep
 262  
    */
 263  
   public long resetMessageCount() {
 264  0
     long messagesSentInSuperstep = totalMsgsSentInSuperstep;
 265  0
     totalMsgsSentInSuperstep = 0;
 266  0
     return messagesSentInSuperstep;
 267  
   }
 268  
 
 269  
   /**
 270  
    * Reset the message bytes count per superstep.
 271  
    *
 272  
    * @return The message count sent in last superstep
 273  
    */
 274  
   public long resetMessageBytesCount() {
 275  0
     long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep;
 276  0
     totalMsgBytesSentInSuperstep = 0;
 277  0
     return messageBytesSentInSuperstep;
 278  
   }
 279  
 }