Coverage Report - org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper
 
Classes in this File Line Coverage Branch Coverage Complexity
AsyncMessageStoreWrapper
0%
0/61
0%
0/8
0
AsyncMessageStoreWrapper$1
N/A
N/A
0
AsyncMessageStoreWrapper$MessageStoreQueueWorker
0%
0/17
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.comm.messages.queue;
 19  
 
 20  
 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
 21  
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 import java.util.concurrent.BlockingQueue;
 27  
 import java.util.concurrent.ExecutorService;
 28  
 import java.util.concurrent.Executors;
 29  
 import java.util.concurrent.LinkedBlockingQueue;
 30  
 import java.util.concurrent.Semaphore;
 31  
 
 32  
 import org.apache.giraph.comm.messages.MessageStore;
 33  
 import org.apache.giraph.utils.ThreadUtils;
 34  
 import org.apache.giraph.utils.VertexIdMessages;
 35  
 import org.apache.hadoop.io.Writable;
 36  
 import org.apache.hadoop.io.WritableComparable;
 37  
 import org.apache.log4j.Logger;
 38  
 
 39  
 /**
 40  
  * This class decouples message receiving and processing
 41  
  * into separate threads thus reducing contention.
 42  
  * It does not provide message store functionality itself, rather
 43  
  * providing a wrapper around existing message stores that
 44  
  * can now be used in async mode with only slight modifications.
 45  
  * @param <I> Vertex id
 46  
  * @param <M> Message data
 47  
  */
 48  0
 public final class AsyncMessageStoreWrapper<I extends WritableComparable,
 49  
     M extends Writable> implements MessageStore<I, M> {
 50  
 
 51  
   /** Logger */
 52  0
   private static final Logger LOG =
 53  0
       Logger.getLogger(AsyncMessageStoreWrapper.class);
 54  
   /** Pass this id to clear the queues and shutdown all threads
 55  
    * started by this processor */
 56  0
   private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
 57  
       new PartitionMessage(-1, null);
 58  
   /** Pass this message to clear the queues but keep threads alive */
 59  0
   private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
 60  
       new PartitionMessage(-1, null);
 61  
   /** Executor that processes messages in background */
 62  0
   private static final ExecutorService EXECUTOR_SERVICE =
 63  0
       Executors.newCachedThreadPool(
 64  0
           ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
 65  
 
 66  
   /** Number of threads that will process messages in background */
 67  
   private final int threadsCount;
 68  
   /** Queue that temporary stores messages */
 69  
   private final BlockingQueue<PartitionMessage<I, M>>[] queues;
 70  
   /** Map from partition id to thread that process this partition */
 71  
   private final Int2IntMap partition2Queue;
 72  
   /** Signals that all procesing is done */
 73  
   private Semaphore completionSemaphore;
 74  
   /** Underlying message store */
 75  
   private final MessageStore<I, M> store;
 76  
 
 77  
   /**
 78  
    * Constructs async wrapper around existing message store
 79  
    * object. Requires partition list and number of threads
 80  
    * to properly initialize background threads and assign partitions.
 81  
    * Partitions are assigned to threads in round-robin fashion.
 82  
    * It guarantees that all threads have almost the same number of
 83  
    * partitions (+-1) no matter how partitions are assigned to this worker.
 84  
    * @param store underlying message store to be used in computation
 85  
    * @param partitions partitions assigned to this worker
 86  
    * @param threadCount number of threads that will be used to process
 87  
    *                    messages.
 88  
    */
 89  
   public AsyncMessageStoreWrapper(MessageStore<I, M> store,
 90  
                                   Iterable<Integer> partitions,
 91  0
                                   int threadCount) {
 92  0
     this.store = store;
 93  0
     this.threadsCount = threadCount;
 94  0
     completionSemaphore = new Semaphore(1 - threadsCount);
 95  0
     queues = new BlockingQueue[threadsCount];
 96  0
     partition2Queue = new Int2IntArrayMap();
 97  0
     LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
 98  
 
 99  0
     for (int i = 0; i < threadsCount; i++) {
 100  0
       queues[i] = new LinkedBlockingQueue<>();
 101  0
       EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
 102  
     }
 103  
 
 104  0
     int cnt = 0;
 105  0
     for (int partitionId : partitions) {
 106  0
       partition2Queue.put(partitionId, cnt++ % threadsCount);
 107  0
     }
 108  
 
 109  0
   }
 110  
 
 111  
   @Override
 112  
   public boolean isPointerListEncoding() {
 113  0
     return store.isPointerListEncoding();
 114  
   }
 115  
 
 116  
   @Override
 117  
   public Iterable<M> getVertexMessages(I vertexId) {
 118  0
     return store.getVertexMessages(vertexId);
 119  
   }
 120  
 
 121  
   @Override
 122  
   public void clearVertexMessages(I vertexId) {
 123  0
     store.clearVertexMessages(vertexId);
 124  0
   }
 125  
 
 126  
   @Override
 127  
   public void clearAll() {
 128  
     try {
 129  0
       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
 130  0
         queue.put(SHUTDOWN_QUEUE_MESSAGE);
 131  
       }
 132  0
       completionSemaphore.acquire();
 133  0
     } catch (InterruptedException e) {
 134  0
       throw new RuntimeException(e);
 135  0
     }
 136  0
     store.clearAll();
 137  0
   }
 138  
 
 139  
   @Override
 140  
   public boolean hasMessagesForVertex(I vertexId) {
 141  0
     return store.hasMessagesForVertex(vertexId);
 142  
   }
 143  
 
 144  
   @Override
 145  
   public boolean hasMessagesForPartition(int partitionId) {
 146  0
     return store.hasMessagesForPartition(partitionId);
 147  
   }
 148  
 
 149  
   @Override
 150  
   public void addPartitionMessages(
 151  
       int partitionId, VertexIdMessages<I, M> messages) {
 152  0
     int hash = partition2Queue.get(partitionId);
 153  
     try {
 154  0
       queues[hash].put(new PartitionMessage<>(partitionId, messages));
 155  0
     } catch (InterruptedException e) {
 156  0
       throw new RuntimeException(e);
 157  0
     }
 158  0
   }
 159  
 
 160  
   @Override
 161  
   public void addMessage(I vertexId, M message) throws IOException {
 162  
     // TODO: implement if LocalBlockRunner needs async message store
 163  0
     throw new UnsupportedOperationException();
 164  
   }
 165  
 
 166  
   @Override
 167  
   public void finalizeStore() {
 168  0
     store.finalizeStore();
 169  0
   }
 170  
 
 171  
   @Override
 172  
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
 173  0
     return store.getPartitionDestinationVertices(partitionId);
 174  
   }
 175  
 
 176  
   @Override
 177  
   public void clearPartition(int partitionId) {
 178  0
     store.clearPartition(partitionId);
 179  0
   }
 180  
 
 181  
   @Override
 182  
   public void writePartition(DataOutput out, int partitionId)
 183  
     throws IOException {
 184  0
     store.writePartition(out, partitionId);
 185  0
   }
 186  
 
 187  
   @Override
 188  
   public void readFieldsForPartition(DataInput in, int partitionId)
 189  
     throws IOException {
 190  0
     store.readFieldsForPartition(in, partitionId);
 191  0
   }
 192  
 
 193  
   /**
 194  
    * Wait till all messages are processed and all queues are empty.
 195  
    */
 196  
   public void waitToComplete() {
 197  
     try {
 198  0
       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
 199  0
         queue.put(CLEAR_QUEUE_MESSAGE);
 200  
       }
 201  0
       completionSemaphore.acquire();
 202  0
       completionSemaphore = new Semaphore(1 - threadsCount);
 203  0
     } catch (InterruptedException e) {
 204  0
       throw new RuntimeException(e);
 205  0
     }
 206  0
   }
 207  
 
 208  
   /**
 209  
    * This runnable has logic for background thread
 210  
    * that actually does message processing.
 211  
    */
 212  0
   private class MessageStoreQueueWorker implements Runnable {
 213  
     /**
 214  
      * Queue assigned to this background thread.
 215  
      */
 216  
     private final BlockingQueue<PartitionMessage<I, M>> queue;
 217  
 
 218  
     /**
 219  
      * Constructs runnable.
 220  
      * @param queue where messages are put by client
 221  
      */
 222  
     private MessageStoreQueueWorker(
 223  0
         BlockingQueue<PartitionMessage<I, M>> queue) {
 224  0
       this.queue = queue;
 225  0
     }
 226  
 
 227  
     @Override
 228  
     public void run() {
 229  0
       PartitionMessage<I, M> message = null;
 230  
       while (true) {
 231  
         try {
 232  0
           message = queue.take();
 233  0
           if (message.getMessage() != null) {
 234  0
             int partitionId = message.getPartitionId();
 235  0
             store.addPartitionMessages(partitionId, message.getMessage());
 236  0
           } else {
 237  0
             completionSemaphore.release();
 238  0
             if (message == SHUTDOWN_QUEUE_MESSAGE) {
 239  0
               return;
 240  
             }
 241  
           }
 242  0
         } catch (InterruptedException e) {
 243  0
           LOG.error("MessageStoreQueueWorker.run: " + message, e);
 244  0
           return;
 245  0
         }
 246  
       }
 247  
     }
 248  
   }
 249  
 }