Coverage Report - org.apache.giraph.ooc.data.DiskBackedMessageStore
 
Classes in this File Line Coverage Branch Coverage Complexity
DiskBackedMessageStore
0%
0/90
0%
0/21
0
DiskBackedMessageStore$1
0%
0/1
N/A
0
DiskBackedMessageStore$SerializedMessageClass
0%
0/3
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.data;
 20  
 
 21  
 import java.io.DataInput;
 22  
 import java.io.DataOutput;
 23  
 import java.io.IOException;
 24  
 
 25  
 import org.apache.giraph.comm.messages.MessageStore;
 26  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 27  
 import org.apache.giraph.factories.MessageValueFactory;
 28  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 29  
 import org.apache.giraph.ooc.persistence.DataIndex;
 30  
 import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
 31  
 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 32  
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 33  
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 34  
 import org.apache.giraph.utils.VertexIdMessages;
 35  
 import org.apache.hadoop.io.Writable;
 36  
 import org.apache.hadoop.io.WritableComparable;
 37  
 import org.apache.log4j.Logger;
 38  
 
 39  
 
 40  
 /**
 41  
  * Implementation of a message store used for out-of-core mechanism.
 42  
  *
 43  
  * @param <I> Vertex id
 44  
  * @param <M> Message data
 45  
  */
 46  0
 public class DiskBackedMessageStore<I extends WritableComparable,
 47  
     M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
 48  
     implements MessageStore<I, M> {
 49  
   /** Class logger. */
 50  0
   private static final Logger LOG =
 51  0
       Logger.getLogger(DiskBackedMessageStore.class);
 52  
   /** Configuration */
 53  
   private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 54  
   /** In-memory message store */
 55  
   private final MessageStore<I, M> messageStore;
 56  
   /** Whether the message store uses message combiner or not */
 57  
   private final boolean useMessageCombiner;
 58  
   /** Which superstep this message store is used for */
 59  
   private final long superstep;
 60  
   /** Message value class */
 61  
   private final MessageValueFactory<M> messageValueFactory;
 62  
 
 63  
   /**
 64  
    * Type of VertexIdMessage class (container for serialized messages) received
 65  
    * for a particular message. If we write the received messages to disk before
 66  
    * adding them to message store, we need this type when we want to read the
 67  
    * messages back from disk (so that we know how to read the messages from
 68  
    * disk).
 69  
    */
 70  0
   private enum SerializedMessageClass {
 71  
     /** ByteArrayVertexIdMessages */
 72  0
     BYTE_ARRAY_VERTEX_ID_MESSAGES,
 73  
     /** ByteArrayOneMEssageToManyIds */
 74  0
     BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
 75  
   }
 76  
 
 77  
   /**
 78  
    * Constructor
 79  
    *
 80  
    * @param config Configuration
 81  
    * @param oocEngine Out-of-core engine
 82  
    * @param messageStore In-memory message store for which out-of-core message
 83  
    *                     store would be wrapper
 84  
    * @param useMessageCombiner Whether message combiner is used for this message
 85  
    *                           store
 86  
    * @param superstep superstep number this messages store is used for
 87  
    */
 88  
   public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
 89  
                                     config,
 90  
                                 OutOfCoreEngine oocEngine,
 91  
                                 MessageStore<I, M> messageStore,
 92  
                                 boolean useMessageCombiner, long superstep) {
 93  0
     super(config, oocEngine);
 94  0
     this.config = config;
 95  0
     this.messageStore = messageStore;
 96  0
     this.useMessageCombiner = useMessageCombiner;
 97  0
     this.superstep = superstep;
 98  0
     this.messageValueFactory = config.createOutgoingMessageValueFactory();
 99  0
   }
 100  
 
 101  
   @Override
 102  
   public boolean isPointerListEncoding() {
 103  0
     return messageStore.isPointerListEncoding();
 104  
   }
 105  
 
 106  
   @Override
 107  
   public Iterable<M> getVertexMessages(I vertexId) {
 108  0
     return messageStore.getVertexMessages(vertexId);
 109  
   }
 110  
 
 111  
   @Override
 112  
   public void clearVertexMessages(I vertexId) {
 113  0
     messageStore.clearVertexMessages(vertexId);
 114  0
   }
 115  
 
 116  
   @Override
 117  
   public void clearAll() {
 118  0
     messageStore.clearAll();
 119  0
   }
 120  
 
 121  
   @Override
 122  
   public boolean hasMessagesForVertex(I vertexId) {
 123  0
     return messageStore.hasMessagesForVertex(vertexId);
 124  
   }
 125  
 
 126  
   @Override
 127  
   public boolean hasMessagesForPartition(int partitionId) {
 128  0
     return messageStore.hasMessagesForPartition(partitionId);
 129  
   }
 130  
 
 131  
   @Override
 132  
   public void addPartitionMessages(
 133  
       int partitionId, VertexIdMessages<I, M> messages) {
 134  0
     if (useMessageCombiner) {
 135  0
       messageStore.addPartitionMessages(partitionId, messages);
 136  
     } else {
 137  0
       addEntry(partitionId, messages);
 138  
     }
 139  0
   }
 140  
 
 141  
   @Override
 142  
   public void addMessage(I vertexId, M message) throws IOException {
 143  0
     if (useMessageCombiner) {
 144  0
       messageStore.addMessage(vertexId, message);
 145  
     } else {
 146  
       // TODO: implement if LocalBlockRunner needs this message store
 147  0
       throw new UnsupportedOperationException();
 148  
     }
 149  0
   }
 150  
 
 151  
   /**
 152  
    * Gets the path that should be used specifically for message data.
 153  
    *
 154  
    * @param basePath path prefix to build the actual path from
 155  
    * @param superstep superstep for which message data should be stored
 156  
    * @return path to files specific for message data
 157  
    */
 158  
   private static String getPath(String basePath, long superstep) {
 159  0
     return basePath + "_messages-S" + superstep;
 160  
   }
 161  
 
 162  
   @Override
 163  
   public long loadPartitionData(int partitionId)
 164  
       throws IOException {
 165  0
     if (!useMessageCombiner) {
 166  0
       return loadPartitionDataProxy(partitionId,
 167  0
           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
 168  0
               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
 169  
     } else {
 170  0
       return 0;
 171  
     }
 172  
   }
 173  
 
 174  
   @Override
 175  
   public long offloadPartitionData(int partitionId)
 176  
       throws IOException {
 177  0
     if (!useMessageCombiner) {
 178  0
       return offloadPartitionDataProxy(partitionId,
 179  0
           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
 180  0
               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
 181  
     } else {
 182  0
       return 0;
 183  
     }
 184  
   }
 185  
 
 186  
   @Override
 187  
   public long offloadBuffers(int partitionId)
 188  
       throws IOException {
 189  0
     if (!useMessageCombiner) {
 190  0
       return offloadBuffersProxy(partitionId,
 191  0
           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
 192  0
               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
 193  
     } else {
 194  0
       return 0;
 195  
     }
 196  
   }
 197  
 
 198  
   @Override
 199  
   public void finalizeStore() {
 200  0
     messageStore.finalizeStore();
 201  0
   }
 202  
 
 203  
   @Override
 204  
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
 205  0
     return messageStore.getPartitionDestinationVertices(partitionId);
 206  
   }
 207  
 
 208  
   @Override
 209  
   public void clearPartition(int partitionId) {
 210  0
     messageStore.clearPartition(partitionId);
 211  0
   }
 212  
 
 213  
   @Override
 214  
   public void writePartition(DataOutput out, int partitionId)
 215  
       throws IOException {
 216  0
     messageStore.writePartition(out, partitionId);
 217  0
   }
 218  
 
 219  
   @Override
 220  
   public void readFieldsForPartition(DataInput in, int partitionId)
 221  
       throws IOException {
 222  0
     messageStore.readFieldsForPartition(in, partitionId);
 223  0
   }
 224  
 
 225  
   @Override
 226  
   protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
 227  
       throws IOException {
 228  
     SerializedMessageClass messageClass;
 229  0
     if (messages instanceof ByteArrayVertexIdMessages) {
 230  0
       messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
 231  0
     } else if (messages instanceof ByteArrayOneMessageToManyIds) {
 232  0
       messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
 233  
     } else {
 234  0
       throw new IllegalStateException("writeEntry: serialized message " +
 235  
           "type is not supported");
 236  
     }
 237  0
     out.writeByte(messageClass.ordinal());
 238  0
     messages.write(out);
 239  0
   }
 240  
 
 241  
   @Override
 242  
   protected VertexIdMessages<I, M> readNextEntry(DataInput in)
 243  
       throws IOException {
 244  0
     byte messageType = in.readByte();
 245  
     SerializedMessageClass messageClass =
 246  0
         SerializedMessageClass.values()[messageType];
 247  
     VertexIdMessages<I, M> vim;
 248  0
     switch (messageClass) {
 249  
     case BYTE_ARRAY_VERTEX_ID_MESSAGES:
 250  0
       vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
 251  0
       vim.setConf(config);
 252  0
       break;
 253  
     case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
 254  0
       vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
 255  0
       vim.setConf(config);
 256  0
       break;
 257  
     default:
 258  0
       throw new IllegalStateException("readNextEntry: unsupported " +
 259  
           "serialized message type!");
 260  
     }
 261  0
     vim.readFields(in);
 262  0
     return vim;
 263  
   }
 264  
 
 265  
   @Override
 266  
   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
 267  
                                            DataIndex index) throws IOException {
 268  0
     long numBytes = 0;
 269  0
     if (hasPartitionDataOnFile.remove(partitionId)) {
 270  0
       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
 271  0
           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
 272  0
       messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
 273  
           partitionId);
 274  0
       numBytes = inputWrapper.finalizeInput(true);
 275  
     }
 276  0
     return numBytes;
 277  
   }
 278  
 
 279  
   @Override
 280  
   protected long offloadInMemoryPartitionData(
 281  
       int partitionId, int ioThreadId, DataIndex index) throws IOException {
 282  0
     long numBytes = 0;
 283  0
     if (messageStore.hasMessagesForPartition(partitionId)) {
 284  0
       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
 285  0
           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
 286  
               false);
 287  0
       messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
 288  0
       messageStore.clearPartition(partitionId);
 289  0
       numBytes = outputWrapper.finalizeOutput();
 290  0
       hasPartitionDataOnFile.add(partitionId);
 291  
     }
 292  0
     return numBytes;
 293  
   }
 294  
 
 295  
   @Override
 296  
   protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
 297  0
     return messages.getSerializedSize();
 298  
   }
 299  
 
 300  
   @Override
 301  
   protected void addEntryToInMemoryPartitionData(int partitionId,
 302  
                                                  VertexIdMessages<I, M>
 303  
                                                      messages) {
 304  0
     messageStore.addPartitionMessages(partitionId, messages);
 305  0
   }
 306  
 }