Coverage Report - org.apache.giraph.block_app.framework.api.local.InternalMessageStore
 
Classes in this File Line Coverage Branch Coverage Complexity
InternalMessageStore
N/A
N/A
0
InternalMessageStore$InternalChecksMessageStore
0%
0/23
0%
0/2
0
InternalMessageStore$InternalChecksMessageStore$1
0%
0/7
N/A
0
InternalMessageStore$InternalWrappedMessageStore
0%
0/30
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.api.local;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.ArrayList;
 22  
 import java.util.Iterator;
 23  
 import java.util.List;
 24  
 import java.util.concurrent.ThreadLocalRandom;
 25  
 
 26  
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 27  
 import org.apache.giraph.comm.messages.MessageStore;
 28  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 29  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 30  
 import org.apache.giraph.conf.MessageClasses;
 31  
 import org.apache.giraph.factories.MessageValueFactory;
 32  
 import org.apache.giraph.utils.WritableUtils;
 33  
 import org.apache.hadoop.io.Writable;
 34  
 import org.apache.hadoop.io.WritableComparable;
 35  
 
 36  
 import com.google.common.collect.Iterators;
 37  
 
 38  
 /**
 39  
  * Interface for internal message store, used by LocalBlockRunner
 40  
  *
 41  
  * @param <I> Vertex id type
 42  
  * @param <M> Message type
 43  
  */
 44  
 @SuppressWarnings("rawtypes")
 45  
 interface InternalMessageStore
 46  
     <I extends WritableComparable, M extends Writable> {
 47  
   Iterator<I> targetVertexIds();
 48  
   boolean hasMessage(I id);
 49  
   Iterable<M> takeMessages(I id);
 50  
   void sendMessage(I id, M message);
 51  
   void sendMessageToMultipleEdges(Iterator<I> idIter, M message);
 52  
   void finalizeStore();
 53  
   Iterable<I> getPartitionDestinationVertices(int partitionId);
 54  
 
 55  
   /**
 56  
    * A wrapper that uses InMemoryMessageStoreFactory to
 57  
    * create MessageStore
 58  
    *
 59  
    * @param <I> Vertex id type
 60  
    * @param <M> Message type
 61  
    */
 62  
   class InternalWrappedMessageStore
 63  
   <I extends WritableComparable, M extends Writable>
 64  
   implements InternalMessageStore<I, M> {
 65  
     private final MessageStore<I, M> messageStore;
 66  
     private final PartitionSplitInfo<I> partitionInfo;
 67  
 
 68  
     private InternalWrappedMessageStore(
 69  
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
 70  
       MessageStore<I, M> messageStore,
 71  
       PartitionSplitInfo<I> partitionInfo
 72  0
     ) {
 73  0
       this.messageStore = messageStore;
 74  0
       this.partitionInfo = partitionInfo;
 75  0
     }
 76  
 
 77  
     public static <I extends WritableComparable, M extends Writable>
 78  
     InternalMessageStore<I, M> create(
 79  
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
 80  
       MessageClasses<I, M> messageClasses,
 81  
       PartitionSplitInfo<I> partitionInfo
 82  
     ) {
 83  0
       InMemoryMessageStoreFactory<I, M> factory =
 84  
         new InMemoryMessageStoreFactory<>();
 85  0
       factory.initialize(partitionInfo, conf);
 86  0
       return new InternalWrappedMessageStore<>(
 87  
         conf,
 88  0
         factory.newStore(messageClasses),
 89  
         partitionInfo
 90  
       );
 91  
     }
 92  
 
 93  
     @Override
 94  
     public void sendMessage(I id, M message) {
 95  
       try {
 96  0
         messageStore.addMessage(id, message);
 97  0
       } catch (IOException e) {
 98  0
         throw new RuntimeException(e);
 99  0
       }
 100  0
     }
 101  
 
 102  
     @Override
 103  
     public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
 104  0
       while (idIter.hasNext()) {
 105  0
         sendMessage(idIter.next(), message);
 106  
       }
 107  0
     }
 108  
 
 109  
     @Override
 110  
     public Iterable<M> takeMessages(I id) {
 111  0
       Iterable<M> result = messageStore.getVertexMessages(id);
 112  0
       messageStore.clearVertexMessages(id);
 113  0
       return result;
 114  
     }
 115  
 
 116  
     @Override
 117  
     public Iterable<I> getPartitionDestinationVertices(int partitionId) {
 118  0
       return messageStore.getPartitionDestinationVertices(partitionId);
 119  
     }
 120  
 
 121  
     @Override
 122  
     public Iterator<I> targetVertexIds() {
 123  0
       List<Iterator<I>> iterators = new ArrayList<>();
 124  0
       for (int partition : partitionInfo.getPartitionIds()) {
 125  0
         Iterable<I> vertices =
 126  0
           messageStore.getPartitionDestinationVertices(partition);
 127  0
         iterators.add(vertices.iterator());
 128  0
       }
 129  0
       return Iterators.concat(iterators.iterator());
 130  
     }
 131  
 
 132  
     @Override
 133  
     public boolean hasMessage(I id) {
 134  0
       return messageStore.hasMessagesForVertex(id);
 135  
     }
 136  
 
 137  
     @Override
 138  
     public void finalizeStore() {
 139  0
       messageStore.finalizeStore();
 140  0
     }
 141  
   }
 142  
 
 143  
   /**
 144  
    * Message store that add checks for whether serialization seems to be
 145  
    * working fine
 146  
    */
 147  0
   static class InternalChecksMessageStore
 148  
       <I extends WritableComparable, M extends Writable>
 149  
       implements InternalMessageStore<I, M> {
 150  
     private final InternalMessageStore<I, M> messageStore;
 151  
     private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 152  
     private final MessageValueFactory<M> messageFactory;
 153  
 
 154  
     public InternalChecksMessageStore(
 155  
       InternalMessageStore<I, M> messageStore,
 156  
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
 157  
       MessageValueFactory<M> messageFactory
 158  0
     ) {
 159  0
       this.messageStore = messageStore;
 160  0
       this.conf = conf;
 161  0
       this.messageFactory = messageFactory;
 162  0
     }
 163  
 
 164  
     // Use message copies probabilistically, to catch both not serializing some
 165  
     // fields, and storing references from message object itself
 166  
     // (which can be reusable).
 167  
     private M maybeMessageCopy(M message) {
 168  0
       M messageCopy = WritableUtils.createCopy(
 169  
           message, messageFactory, conf);
 170  0
       return ThreadLocalRandom.current().nextBoolean() ? messageCopy : message;
 171  
     }
 172  
 
 173  
     private void checkIdCopy(I id) {
 174  0
       WritableUtils.createCopy(id, conf.getVertexIdFactory(), conf);
 175  0
     }
 176  
 
 177  
     @Override
 178  
     public void sendMessage(I id, M message) {
 179  0
       checkIdCopy(id);
 180  0
       messageStore.sendMessage(id, maybeMessageCopy(message));
 181  0
     }
 182  
 
 183  
     @Override
 184  
     public void sendMessageToMultipleEdges(
 185  
         final Iterator<I> idIter, M message) {
 186  0
       messageStore.sendMessageToMultipleEdges(
 187  0
           new Iterator<I>() {
 188  
             @Override
 189  
             public boolean hasNext() {
 190  0
               return idIter.hasNext();
 191  
             }
 192  
 
 193  
             @Override
 194  
             public I next() {
 195  0
               I id = idIter.next();
 196  0
               checkIdCopy(id);
 197  0
               return id;
 198  
             }
 199  
 
 200  
             @Override
 201  
             public void remove() {
 202  0
               idIter.remove();
 203  0
             }
 204  
           },
 205  0
           maybeMessageCopy(message));
 206  0
     }
 207  
 
 208  
     @Override
 209  
     public Iterable<M> takeMessages(I id) {
 210  0
       checkIdCopy(id);
 211  0
       return messageStore.takeMessages(id);
 212  
     }
 213  
 
 214  
     @Override
 215  
     public boolean hasMessage(I id) {
 216  0
       return messageStore.hasMessage(id);
 217  
     }
 218  
 
 219  
     @Override
 220  
     public Iterator<I> targetVertexIds() {
 221  0
       return messageStore.targetVertexIds();
 222  
     }
 223  
 
 224  
     @Override
 225  
     public Iterable<I> getPartitionDestinationVertices(int partitionId) {
 226  0
       return messageStore.getPartitionDestinationVertices(partitionId);
 227  
     }
 228  
 
 229  
     @Override
 230  
     public void finalizeStore() {
 231  0
       messageStore.finalizeStore();
 232  0
     }
 233  
   }
 234  
 }