Coverage Report - org.apache.giraph.edge.AbstractEdgeStore
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractEdgeStore
0%
0/66
0%
0/26
2.588
AbstractEdgeStore$1
0%
0/2
N/A
2.588
AbstractEdgeStore$1$1
0%
0/49
0%
0/24
2.588
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.edge;
 20  
 
 21  
 import com.google.common.collect.MapMaker;
 22  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 23  
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 24  
 import org.apache.giraph.conf.GiraphConstants;
 25  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 26  
 import org.apache.giraph.graph.Vertex;
 27  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 28  
 import org.apache.giraph.partition.Partition;
 29  
 import org.apache.giraph.utils.CallableFactory;
 30  
 import org.apache.giraph.utils.ProgressCounter;
 31  
 import org.apache.giraph.utils.ProgressableUtils;
 32  
 import org.apache.giraph.utils.ThreadLocalProgressCounter;
 33  
 import org.apache.giraph.utils.Trimmable;
 34  
 import org.apache.giraph.utils.VertexIdEdgeIterator;
 35  
 import org.apache.giraph.utils.VertexIdEdges;
 36  
 import org.apache.hadoop.io.Writable;
 37  
 import org.apache.hadoop.io.WritableComparable;
 38  
 import org.apache.hadoop.util.Progressable;
 39  
 import org.apache.log4j.Logger;
 40  
 
 41  
 import java.io.DataInput;
 42  
 import java.io.DataOutput;
 43  
 import java.io.IOException;
 44  
 import java.util.Iterator;
 45  
 import java.util.Map;
 46  
 import java.util.concurrent.Callable;
 47  
 import java.util.concurrent.ConcurrentMap;
 48  
 
 49  
 import static com.google.common.base.Preconditions.checkState;
 50  
 
 51  
 /**
 52  
  * Basic implementation of edges store, extended this to easily define simple
 53  
  * and primitive edge stores
 54  
  *
 55  
  * @param <I> Vertex id
 56  
  * @param <V> Vertex value
 57  
  * @param <E> Edge value
 58  
  * @param <K> Key corresponding to Vertex id
 59  
  * @param <Et> Entry type
 60  
  */
 61  0
 public abstract class AbstractEdgeStore<I extends WritableComparable,
 62  
   V extends Writable, E extends Writable, K, Et>
 63  
   extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
 64  
   implements EdgeStore<I, V, E> {
 65  
   /** Used to keep track of progress during the move-edges process */
 66  0
   public static final ThreadLocalProgressCounter PROGRESS_COUNTER =
 67  
     new ThreadLocalProgressCounter();
 68  
   /** Class logger */
 69  0
   private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
 70  
   /** Service worker. */
 71  
   protected CentralizedServiceWorker<I, V, E> service;
 72  
   /** Giraph configuration. */
 73  
   protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
 74  
   /** Progressable to report progress. */
 75  
   protected Progressable progressable;
 76  
   /** Map used to temporarily store incoming edges. */
 77  
   protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
 78  
   /**
 79  
    * Whether the chosen {@link OutEdges} implementation allows for Edge
 80  
    * reuse.
 81  
    */
 82  
   protected boolean reuseEdgeObjects;
 83  
   /**
 84  
    * Whether the {@link OutEdges} class used during input is different
 85  
    * from the one used during computation.
 86  
    */
 87  
   protected boolean useInputOutEdges;
 88  
   /** Whether we spilled edges on disk */
 89  0
   private volatile boolean hasEdgesOnDisk = false;
 90  
   /** Create source vertices */
 91  
   private CreateSourceVertexCallback<I> createSourceVertexCallback;
 92  
 
 93  
 
 94  
   /**
 95  
    * Constructor.
 96  
    *
 97  
    * @param service Service worker
 98  
    * @param configuration Configuration
 99  
    * @param progressable Progressable
 100  
    */
 101  
   public AbstractEdgeStore(
 102  
     CentralizedServiceWorker<I, V, E> service,
 103  
     ImmutableClassesGiraphConfiguration<I, V, E> configuration,
 104  0
     Progressable progressable) {
 105  0
     this.service = service;
 106  0
     this.configuration = configuration;
 107  0
     this.progressable = progressable;
 108  0
     transientEdges = new MapMaker().concurrencyLevel(
 109  0
       configuration.getNettyServerExecutionConcurrency()).makeMap();
 110  0
     reuseEdgeObjects = configuration.reuseEdgeObjects();
 111  0
     useInputOutEdges = configuration.useInputOutEdges();
 112  0
     createSourceVertexCallback =
 113  
         GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
 114  0
             .newInstance(configuration);
 115  0
   }
 116  
 
 117  
   /**
 118  
    * Get vertexId for a given key
 119  
    *
 120  
    * @param entry for vertexId key
 121  
    * @param representativeVertexId representativeVertexId
 122  
    * @return vertex Id
 123  
    */
 124  
   protected abstract I getVertexId(Et entry, I representativeVertexId);
 125  
 
 126  
   /**
 127  
    * Create vertexId from a given key
 128  
    *
 129  
    * @param entry for vertexId key
 130  
    * @return new vertexId
 131  
    */
 132  
   protected abstract I createVertexId(Et entry);
 133  
 
 134  
   /**
 135  
    * Get OutEdges for a given partition
 136  
    *
 137  
    * @param partitionId id of partition
 138  
    * @return OutEdges for the partition
 139  
    */
 140  
   protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
 141  
 
 142  
   /**
 143  
    * Return the OutEdges for a given partition
 144  
    *
 145  
    * @param entry for vertexId key
 146  
    * @return out edges
 147  
    */
 148  
   protected abstract OutEdges<I, E> getPartitionEdges(Et entry);
 149  
 
 150  
   /**
 151  
    * Writes the given key to the output
 152  
    *
 153  
    * @param key input key to be written
 154  
    * @param output output to write the key to
 155  
    */
 156  
   protected abstract void writeVertexKey(K key, DataOutput output)
 157  
   throws IOException;
 158  
 
 159  
   /**
 160  
    * Reads the given key from the input
 161  
    *
 162  
    * @param input input to read the key from
 163  
    * @return Key read from the input
 164  
    */
 165  
   protected abstract K readVertexKey(DataInput input) throws IOException;
 166  
 
 167  
   /**
 168  
    * Get iterator for partition edges
 169  
    *
 170  
    * @param partitionEdges map of out-edges for vertices in a partition
 171  
    * @return iterator
 172  
    */
 173  
   protected abstract Iterator<Et>
 174  
   getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
 175  
 
 176  
   @Override
 177  
   public boolean hasEdgesForPartition(int partitionId) {
 178  0
     return transientEdges.containsKey(partitionId);
 179  
   }
 180  
 
 181  
   @Override
 182  
   public void writePartitionEdgeStore(int partitionId, DataOutput output)
 183  
       throws IOException {
 184  0
     Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
 185  0
     if (edges != null) {
 186  0
       output.writeInt(edges.size());
 187  0
       if (edges.size() > 0) {
 188  0
         hasEdgesOnDisk = true;
 189  
       }
 190  0
       for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
 191  0
         writeVertexKey(edge.getKey(), output);
 192  0
         edge.getValue().write(output);
 193  0
       }
 194  
     }
 195  0
   }
 196  
 
 197  
   @Override
 198  
   public void readPartitionEdgeStore(int partitionId, DataInput input)
 199  
       throws IOException {
 200  0
     checkState(!transientEdges.containsKey(partitionId),
 201  
         "readPartitionEdgeStore: reading a partition that is already there in" +
 202  
             " the partition store (impossible)");
 203  0
     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
 204  0
     int numEntries = input.readInt();
 205  0
     for (int i = 0; i < numEntries; ++i) {
 206  0
       K vertexKey = readVertexKey(input);
 207  0
       OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges();
 208  0
       edges.readFields(input);
 209  0
       partitionEdges.put(vertexKey, edges);
 210  
     }
 211  0
   }
 212  
 
 213  
   /**
 214  
    * Get out-edges for a given vertex
 215  
    *
 216  
    * @param vertexIdEdgeIterator vertex Id Edge iterator
 217  
    * @param partitionEdgesIn map of out-edges for vertices in a partition
 218  
    * @return out-edges for the vertex
 219  
    */
 220  
   protected abstract OutEdges<I, E> getVertexOutEdges(
 221  
     VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
 222  
     Map<K, OutEdges<I, E>> partitionEdgesIn);
 223  
 
 224  
   @Override
 225  
   public void addPartitionEdges(
 226  
     int partitionId, VertexIdEdges<I, E> edges) {
 227  0
     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
 228  
 
 229  0
     VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
 230  0
         edges.getVertexIdEdgeIterator();
 231  0
     while (vertexIdEdgeIterator.hasNext()) {
 232  0
       vertexIdEdgeIterator.next();
 233  0
       Edge<I, E> edge = reuseEdgeObjects ?
 234  0
           vertexIdEdgeIterator.getCurrentEdge() :
 235  0
           vertexIdEdgeIterator.releaseCurrentEdge();
 236  0
       OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
 237  
           partitionEdges);
 238  0
       synchronized (outEdges) {
 239  0
         outEdges.add(edge);
 240  0
       }
 241  0
     }
 242  0
   }
 243  
 
 244  
   /**
 245  
    * Convert the input edges to the {@link OutEdges} data structure used
 246  
    * for computation (if different).
 247  
    *
 248  
    * @param inputEdges Input edges
 249  
    * @return Compute edges
 250  
    */
 251  
   private OutEdges<I, E> convertInputToComputeEdges(
 252  
     OutEdges<I, E> inputEdges) {
 253  0
     if (!useInputOutEdges) {
 254  0
       return inputEdges;
 255  
     } else {
 256  0
       return configuration.createAndInitializeOutEdges(inputEdges);
 257  
     }
 258  
   }
 259  
 
 260  
   @Override
 261  
   public void moveEdgesToVertices() {
 262  0
     if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
 263  0
       if (LOG.isInfoEnabled()) {
 264  0
         LOG.info("moveEdgesToVertices: No edges to move");
 265  
       }
 266  0
       return;
 267  
     }
 268  
 
 269  0
     if (LOG.isInfoEnabled()) {
 270  0
       LOG.info("moveEdgesToVertices: Moving incoming edges to " +
 271  
           "vertices. Using " + createSourceVertexCallback);
 272  
     }
 273  
 
 274  0
     service.getPartitionStore().startIteration();
 275  0
     int numThreads = configuration.getNumInputSplitsThreads();
 276  
 
 277  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 278  
       @Override
 279  
       public Callable<Void> newCallable(int callableId) {
 280  0
         return new Callable<Void>() {
 281  
           @Override
 282  
           public Void call() throws Exception {
 283  0
             I representativeVertexId = configuration.createVertexId();
 284  0
             OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
 285  0
             if (oocEngine != null) {
 286  0
               oocEngine.processingThreadStart();
 287  
             }
 288  0
             ProgressCounter numVerticesProcessed = PROGRESS_COUNTER.get();
 289  
             while (true) {
 290  0
               Partition<I, V, E> partition =
 291  0
                   service.getPartitionStore().getNextPartition();
 292  0
               if (partition == null) {
 293  0
                 break;
 294  
               }
 295  0
               Map<K, OutEdges<I, E>> partitionEdges =
 296  0
                   transientEdges.remove(partition.getId());
 297  0
               if (partitionEdges == null) {
 298  0
                 service.getPartitionStore().putPartition(partition);
 299  0
                 continue;
 300  
               }
 301  
 
 302  0
               Iterator<Et> iterator =
 303  0
                   getPartitionEdgesIterator(partitionEdges);
 304  
               // process all vertices in given partition
 305  0
               int count = 0;
 306  0
               while (iterator.hasNext()) {
 307  
                 // If out-of-core mechanism is used, check whether this thread
 308  
                 // can stay active or it should temporarily suspend and stop
 309  
                 // processing and generating more data for the moment.
 310  0
                 if (oocEngine != null &&
 311  
                     (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
 312  0
                   oocEngine.activeThreadCheckIn();
 313  
                 }
 314  0
                 Et entry = iterator.next();
 315  0
                 I vertexId = getVertexId(entry, representativeVertexId);
 316  0
                 OutEdges<I, E> outEdges = convertInputToComputeEdges(
 317  0
                   getPartitionEdges(entry));
 318  0
                 Vertex<I, V, E> vertex = partition.getVertex(vertexId);
 319  
                 // If the source vertex doesn't exist, create it. Otherwise,
 320  
                 // just set the edges.
 321  0
                 if (vertex == null) {
 322  0
                   if (createSourceVertexCallback
 323  0
                       .shouldCreateSourceVertex(vertexId)) {
 324  
                     // createVertex only if it is allowed by configuration
 325  0
                     vertex = configuration.createVertex();
 326  0
                     vertex.initialize(createVertexId(entry),
 327  0
                         configuration.createVertexValue(), outEdges);
 328  0
                     partition.putVertex(vertex);
 329  
                   }
 330  
                 } else {
 331  
                   // A vertex may exist with or without edges initially
 332  
                   // and optimize the case of no initial edges
 333  0
                   if (vertex.getNumEdges() == 0) {
 334  0
                     vertex.setEdges(outEdges);
 335  
                   } else {
 336  0
                     for (Edge<I, E> edge : outEdges) {
 337  0
                       vertex.addEdge(edge);
 338  0
                     }
 339  
                   }
 340  0
                   if (vertex instanceof Trimmable) {
 341  0
                     ((Trimmable) vertex).trim();
 342  
                   }
 343  
                   // Some Partition implementations (e.g. ByteArrayPartition)
 344  
                   // require us to put back the vertex after modifying it.
 345  0
                   partition.saveVertex(vertex);
 346  
                 }
 347  0
                 numVerticesProcessed.inc();
 348  0
                 iterator.remove();
 349  0
               }
 350  
               // Some PartitionStore implementations
 351  
               // (e.g. DiskBackedPartitionStore) require us to put back the
 352  
               // partition after modifying it.
 353  0
               service.getPartitionStore().putPartition(partition);
 354  0
             }
 355  0
             if (oocEngine != null) {
 356  0
               oocEngine.processingThreadFinish();
 357  
             }
 358  0
             return null;
 359  
           }
 360  
         };
 361  
       }
 362  
     };
 363  0
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
 364  
         "move-edges-%d", progressable);
 365  
 
 366  
     // remove all entries
 367  0
     transientEdges.clear();
 368  
 
 369  0
     if (LOG.isInfoEnabled()) {
 370  0
       LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
 371  
           "vertices.");
 372  
     }
 373  0
   }
 374  
 }