Coverage Report - org.apache.giraph.comm.ServerData
 
Classes in this File Line Coverage Branch Coverage Complexity
ServerData
0%
0/134
0%
0/48
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
 22  
 
 23  
 import java.util.ArrayList;
 24  
 import java.util.Collections;
 25  
 import java.util.List;
 26  
 import java.util.Map;
 27  
 import java.util.concurrent.ConcurrentMap;
 28  
 
 29  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 30  
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 31  
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 32  
 import org.apache.giraph.comm.messages.MessageStore;
 33  
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 34  
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 35  
 import org.apache.giraph.conf.GiraphConstants;
 36  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 37  
 import org.apache.giraph.edge.EdgeStore;
 38  
 import org.apache.giraph.edge.EdgeStoreFactory;
 39  
 import org.apache.giraph.graph.Vertex;
 40  
 import org.apache.giraph.graph.VertexMutations;
 41  
 import org.apache.giraph.graph.VertexResolver;
 42  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 43  
 import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
 44  
 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
 45  
 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
 46  
 import org.apache.giraph.partition.Partition;
 47  
 import org.apache.giraph.partition.PartitionStore;
 48  
 import org.apache.giraph.partition.SimplePartitionStore;
 49  
 import org.apache.giraph.utils.ReflectionUtils;
 50  
 import org.apache.hadoop.io.Writable;
 51  
 import org.apache.hadoop.io.WritableComparable;
 52  
 import org.apache.hadoop.mapreduce.Mapper;
 53  
 import org.apache.log4j.Logger;
 54  
 
 55  
 import com.google.common.collect.Iterables;
 56  
 import com.google.common.collect.Maps;
 57  
 
 58  
 /**
 59  
  * Anything that the server stores
 60  
  *
 61  
  * @param <I> Vertex id
 62  
  * @param <V> Vertex data
 63  
  * @param <E> Edge data
 64  
  */
 65  
 @SuppressWarnings("rawtypes")
 66  
 public class ServerData<I extends WritableComparable,
 67  
     V extends Writable, E extends Writable> {
 68  
   /** Class logger */
 69  0
   private static final Logger LOG = Logger.getLogger(ServerData.class);
 70  
   /** Configuration */
 71  
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
 72  
   /** Partition store for this worker. */
 73  
   private volatile PartitionStore<I, V, E> partitionStore;
 74  
   /** Edge store for this worker. */
 75  
   private final EdgeStore<I, V, E> edgeStore;
 76  
   /** Message store factory */
 77  
   private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
 78  
       messageStoreFactory;
 79  
   /**
 80  
    * Message store for incoming messages (messages which will be consumed
 81  
    * in the next super step)
 82  
    */
 83  
   private volatile MessageStore<I, Writable> incomingMessageStore;
 84  
   /**
 85  
    * Message store for current messages (messages which we received in
 86  
    * previous super step and which will be consumed in current super step)
 87  
    */
 88  
   private volatile MessageStore<I, Writable> currentMessageStore;
 89  
   /**
 90  
    * Map of partition ids to vertex mutations from other workers. These are
 91  
    * mutations that should be applied before execution of *current* super step.
 92  
    * (accesses to keys should be thread-safe as multiple threads may resolve
 93  
    * mutations of different partitions at the same time)
 94  
    */
 95  0
   private ConcurrentMap<Integer,
 96  
       ConcurrentMap<I, VertexMutations<I, V, E>>>
 97  0
       oldPartitionMutations = Maps.newConcurrentMap();
 98  
   /**
 99  
    * Map of partition ids to vertex mutations from other workers. These are
 100  
    * mutations that are coming from other workers as the execution goes one in a
 101  
    * super step. These mutations should be applied in the *next* super step.
 102  
    * (this should be thread-safe)
 103  
    */
 104  0
   private ConcurrentMap<Integer,
 105  
       ConcurrentMap<I, VertexMutations<I, V, E>>>
 106  0
       partitionMutations = Maps.newConcurrentMap();
 107  
   /**
 108  
    * Holds aggregators which current worker owns from current superstep
 109  
    */
 110  
   private final OwnerAggregatorServerData ownerAggregatorData;
 111  
   /**
 112  
    * Holds old aggregators from previous superstep
 113  
    */
 114  
   private final AllAggregatorServerData allAggregatorData;
 115  
   /** Service worker */
 116  
   private final CentralizedServiceWorker<I, V, E> serviceWorker;
 117  
 
 118  
   /** Store for current messages from other workers to this worker */
 119  0
   private volatile List<Writable> currentWorkerToWorkerMessages =
 120  0
       Collections.synchronizedList(new ArrayList<Writable>());
 121  
   /** Store for message from other workers to this worker for next superstep */
 122  0
   private volatile List<Writable> incomingWorkerToWorkerMessages =
 123  0
       Collections.synchronizedList(new ArrayList<Writable>());
 124  
 
 125  
   /** Job context (for progress) */
 126  
   private final Mapper<?, ?, ?, ?>.Context context;
 127  
   /** Out-of-core engine */
 128  
   private final OutOfCoreEngine oocEngine;
 129  
 
 130  
   /**
 131  
    * Constructor.
 132  
    *
 133  
    * @param service Service worker
 134  
    * @param workerServer Worker server
 135  
    * @param conf Configuration
 136  
    * @param context Mapper context
 137  
    */
 138  
   public ServerData(
 139  
       CentralizedServiceWorker<I, V, E> service,
 140  
       WorkerServer workerServer,
 141  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 142  0
       Mapper<?, ?, ?, ?>.Context context) {
 143  0
     this.serviceWorker = service;
 144  0
     this.conf = conf;
 145  0
     this.messageStoreFactory = createMessageStoreFactory();
 146  0
     EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
 147  0
     edgeStoreFactory.initialize(service, conf, context);
 148  0
     EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
 149  0
     PartitionStore<I, V, E> inMemoryPartitionStore =
 150  
         new SimplePartitionStore<I, V, E>(conf, context);
 151  0
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
 152  0
       oocEngine = new OutOfCoreEngine(conf, service, workerServer);
 153  0
       partitionStore =
 154  
           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
 155  
               conf, context, oocEngine);
 156  0
       edgeStore =
 157  
           new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
 158  
     } else {
 159  0
       partitionStore = inMemoryPartitionStore;
 160  0
       edgeStore = inMemoryEdgeStore;
 161  0
       oocEngine = null;
 162  
     }
 163  0
     ownerAggregatorData = new OwnerAggregatorServerData(context);
 164  0
     allAggregatorData = new AllAggregatorServerData(context, conf);
 165  0
     this.context = context;
 166  0
   }
 167  
 
 168  
   /**
 169  
    * Decide which message store should be used for current application,
 170  
    * and create the factory for that store
 171  
    *
 172  
    * @return Message store factory
 173  
    */
 174  
   private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
 175  
   createMessageStoreFactory() {
 176  0
     Class<? extends MessageStoreFactory> messageStoreFactoryClass =
 177  0
         MESSAGE_STORE_FACTORY_CLASS.get(conf);
 178  
 
 179  0
     MessageStoreFactory messageStoreFactoryInstance =
 180  0
         ReflectionUtils.newInstance(messageStoreFactoryClass);
 181  0
     messageStoreFactoryInstance.initialize(serviceWorker, conf);
 182  
 
 183  0
     return messageStoreFactoryInstance;
 184  
   }
 185  
 
 186  
   /**
 187  
    * Return the out-of-core engine for this worker.
 188  
    *
 189  
    * @return The out-of-core engine
 190  
    */
 191  
   public OutOfCoreEngine getOocEngine() {
 192  0
     return oocEngine;
 193  
   }
 194  
 
 195  
   /**
 196  
    * Return the edge store for this worker.
 197  
    *
 198  
    * @return The edge store
 199  
    */
 200  
   public EdgeStore<I, V, E> getEdgeStore() {
 201  0
     return edgeStore;
 202  
   }
 203  
 
 204  
   /**
 205  
    * Return the partition store for this worker.
 206  
    *
 207  
    * @return The partition store
 208  
    */
 209  
   public PartitionStore<I, V, E> getPartitionStore() {
 210  0
     return partitionStore;
 211  
   }
 212  
 
 213  
   /**
 214  
    * Get message store for incoming messages (messages which will be consumed
 215  
    * in the next super step)
 216  
    *
 217  
    * @param <M> Message data
 218  
    * @return Incoming message store
 219  
    */
 220  
   public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
 221  0
     return (MessageStore<I, M>) incomingMessageStore;
 222  
   }
 223  
 
 224  
   /**
 225  
    * Get message store for current messages (messages which we received in
 226  
    * previous super step and which will be consumed in current super step)
 227  
    *
 228  
    * @param <M> Message data
 229  
    * @return Current message store
 230  
    */
 231  
   public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
 232  0
     return (MessageStore<I, M>) currentMessageStore;
 233  
   }
 234  
 
 235  
   /**
 236  
    * Re-initialize message stores.
 237  
    * Discards old values if any.
 238  
    */
 239  
   public void resetMessageStores() {
 240  0
     if (currentMessageStore != null) {
 241  0
       currentMessageStore.clearAll();
 242  0
       currentMessageStore = null;
 243  
     }
 244  0
     if (incomingMessageStore != null) {
 245  0
       incomingMessageStore.clearAll();
 246  0
       incomingMessageStore = null;
 247  
     }
 248  0
     prepareSuperstep();
 249  0
   }
 250  
 
 251  
   /** Prepare for next superstep */
 252  
   public void prepareSuperstep() {
 253  0
     if (currentMessageStore != null) {
 254  0
       currentMessageStore.clearAll();
 255  
     }
 256  
 
 257  
     MessageStore<I, Writable> nextCurrentMessageStore;
 258  
     MessageStore<I, Writable> nextIncomingMessageStore;
 259  
     MessageStore<I, Writable> messageStore;
 260  
 
 261  
     // First create the necessary in-memory message stores. If out-of-core
 262  
     // mechanism is enabled, we wrap the in-memory message stores within
 263  
     // disk-backed messages stores.
 264  0
     if (incomingMessageStore != null) {
 265  0
       nextCurrentMessageStore = incomingMessageStore;
 266  
     } else {
 267  0
       messageStore = messageStoreFactory.newStore(
 268  0
           conf.getIncomingMessageClasses());
 269  0
       if (oocEngine == null) {
 270  0
         nextCurrentMessageStore = messageStore;
 271  
       } else {
 272  0
         nextCurrentMessageStore = new DiskBackedMessageStore<>(
 273  
             conf, oocEngine, messageStore,
 274  0
             conf.getIncomingMessageClasses().useMessageCombiner(),
 275  0
             serviceWorker.getSuperstep());
 276  
       }
 277  
     }
 278  
 
 279  0
     messageStore = messageStoreFactory.newStore(
 280  0
         conf.getOutgoingMessageClasses());
 281  0
     if (oocEngine == null) {
 282  0
       nextIncomingMessageStore = messageStore;
 283  
     } else {
 284  0
       nextIncomingMessageStore = new DiskBackedMessageStore<>(
 285  
           conf, oocEngine, messageStore,
 286  0
           conf.getOutgoingMessageClasses().useMessageCombiner(),
 287  0
           serviceWorker.getSuperstep() + 1);
 288  
     }
 289  
 
 290  
     // If out-of-core engine is enabled, we avoid overlapping of out-of-core
 291  
     // decisions with change of superstep. This avoidance is done to simplify
 292  
     // the design and reduce excessive use of synchronization primitives.
 293  0
     if (oocEngine != null) {
 294  0
       oocEngine.getSuperstepLock().writeLock().lock();
 295  
     }
 296  0
     currentMessageStore = nextCurrentMessageStore;
 297  0
     incomingMessageStore = nextIncomingMessageStore;
 298  0
     if (oocEngine != null) {
 299  0
       oocEngine.reset();
 300  0
       oocEngine.getSuperstepLock().writeLock().unlock();
 301  
     }
 302  0
     currentMessageStore.finalizeStore();
 303  
 
 304  0
     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
 305  0
     incomingWorkerToWorkerMessages =
 306  0
         Collections.synchronizedList(new ArrayList<Writable>());
 307  0
   }
 308  
 
 309  
   /**
 310  
    * Get the vertex mutations (synchronize on the values)
 311  
    *
 312  
    * @return Vertex mutations
 313  
    */
 314  
   public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
 315  
   getPartitionMutations() {
 316  0
     return partitionMutations;
 317  
   }
 318  
 
 319  
   /**
 320  
    * Get holder for aggregators which current worker owns
 321  
    *
 322  
    * @return Holder for aggregators which current worker owns
 323  
    */
 324  
   public OwnerAggregatorServerData getOwnerAggregatorData() {
 325  0
     return ownerAggregatorData;
 326  
   }
 327  
 
 328  
   /**
 329  
    * Get holder for aggregators from previous superstep
 330  
    *
 331  
    * @return Holder for aggregators from previous superstep
 332  
    */
 333  
   public AllAggregatorServerData getAllAggregatorData() {
 334  0
     return allAggregatorData;
 335  
   }
 336  
 
 337  
   /**
 338  
    * Get the reference of the service worker.
 339  
    *
 340  
    * @return CentralizedServiceWorker
 341  
    */
 342  
   public CentralizedServiceWorker<I, V, E> getServiceWorker() {
 343  0
     return this.serviceWorker;
 344  
   }
 345  
 
 346  
   /**
 347  
    * Get and clear worker to worker messages for this superstep. Can be
 348  
    * called only once per superstep.
 349  
    *
 350  
    * @return List of messages for this worker
 351  
    */
 352  
   public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
 353  0
     List<Writable> ret = currentWorkerToWorkerMessages;
 354  0
     currentWorkerToWorkerMessages = null;
 355  0
     return ret;
 356  
   }
 357  
 
 358  
   /**
 359  
    * Add incoming message to this worker for next superstep. Thread-safe.
 360  
    *
 361  
    * @param message Message received
 362  
    */
 363  
   public void addIncomingWorkerToWorkerMessage(Writable message) {
 364  0
     incomingWorkerToWorkerMessages.add(message);
 365  0
   }
 366  
 
 367  
 
 368  
   /**
 369  
    * Get worker to worker messages received in previous superstep.
 370  
    * @return list of current worker to worker messages.
 371  
    */
 372  
   public List<Writable> getCurrentWorkerToWorkerMessages() {
 373  0
     return currentWorkerToWorkerMessages;
 374  
   }
 375  
 
 376  
   /**
 377  
    * Prepare resolving mutation.
 378  
    */
 379  
   public void prepareResolveMutations() {
 380  0
     oldPartitionMutations = partitionMutations;
 381  0
     partitionMutations = Maps.newConcurrentMap();
 382  0
   }
 383  
 
 384  
   /**
 385  
    * Resolve mutations specific for a partition. This method is called once
 386  
    * per partition, before the computation for that partition starts.
 387  
    * @param partition The partition to resolve mutations for
 388  
    */
 389  
   public void resolvePartitionMutation(Partition<I, V, E> partition) {
 390  0
     Integer partitionId = partition.getId();
 391  0
     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
 392  0
     ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
 393  0
         oldPartitionMutations.get(partitionId);
 394  
 
 395  0
     boolean ignoreExistingVertices =
 396  0
         conf.getIncomingMessageClasses().ignoreExistingVertices();
 397  
 
 398  
     // Resolve mutations that are explicitly sent for this partition
 399  0
     if (prevPartitionMutations != null) {
 400  0
       for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
 401  0
           .entrySet()) {
 402  0
         I vertexId = entry.getKey();
 403  0
         Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
 404  0
         VertexMutations<I, V, E> vertexMutations = entry.getValue();
 405  0
         Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
 406  
             originalVertex, vertexMutations,
 407  
             !ignoreExistingVertices &&
 408  0
             getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
 409  
 
 410  0
         if (LOG.isDebugEnabled()) {
 411  0
           LOG.debug("resolvePartitionMutations: Resolved vertex index " +
 412  
               vertexId + " in partition index " + partitionId +
 413  
               " with original vertex " + originalVertex +
 414  
               ", returned vertex " + vertex + " on superstep " +
 415  0
               serviceWorker.getSuperstep() + " with mutations " +
 416  
               vertexMutations);
 417  
         }
 418  
 
 419  0
         if (vertex != null) {
 420  0
           partition.putVertex(vertex);
 421  0
         } else if (originalVertex != null) {
 422  0
           partition.removeVertex(vertexId);
 423  0
           if (!ignoreExistingVertices) {
 424  0
             getCurrentMessageStore().clearVertexMessages(vertexId);
 425  
           }
 426  
         }
 427  0
         context.progress();
 428  0
       }
 429  
     }
 430  
 
 431  0
     if (!ignoreExistingVertices) {
 432  
       // Keep track of vertices which are not here in the partition, but have
 433  
       // received messages
 434  0
       Iterable<I> destinations = getCurrentMessageStore().
 435  0
           getPartitionDestinationVertices(partitionId);
 436  0
       if (!Iterables.isEmpty(destinations)) {
 437  0
         for (I vertexId : destinations) {
 438  0
           if (partition.getVertex(vertexId) == null) {
 439  0
             Vertex<I, V, E> vertex =
 440  0
                 vertexResolver.resolve(vertexId, null, null, true);
 441  
 
 442  0
             if (LOG.isDebugEnabled()) {
 443  0
               LOG.debug(
 444  
                   "resolvePartitionMutations: A non-existing vertex has " +
 445  
                   "message(s). Added vertex index " + vertexId +
 446  
                   " in partition index " + partitionId +
 447  
                   ", vertex = " + vertex + ", on superstep " +
 448  0
                   serviceWorker.getSuperstep());
 449  
             }
 450  
 
 451  0
             if (vertex != null) {
 452  0
               partition.putVertex(vertex);
 453  
             }
 454  0
             context.progress();
 455  
           }
 456  0
         }
 457  
       }
 458  
     }
 459  0
   }
 460  
 
 461  
   /**
 462  
    * In case of async message store we have to wait for all messages
 463  
    * to be processed before going into next superstep.
 464  
    */
 465  
   public void waitForComplete() {
 466  0
     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
 467  0
       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
 468  
     }
 469  0
   }
 470  
 }