Coverage Report - org.apache.giraph.worker.WorkerContext
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerContext
0%
0/28
0%
0/2
1.056
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import java.io.DataInput;
 22  
 import java.io.DataOutput;
 23  
 import java.io.IOException;
 24  
 import java.util.List;
 25  
 
 26  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 27  
 import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
 28  
 import org.apache.giraph.graph.GraphState;
 29  
 import org.apache.hadoop.io.Writable;
 30  
 import org.apache.hadoop.io.WritableComparable;
 31  
 import org.apache.hadoop.mapreduce.Mapper;
 32  
 
 33  
 /**
 34  
  * WorkerContext allows for the execution of user code
 35  
  * on a per-worker basis. There's one WorkerContext per worker.
 36  
  */
 37  0
 @SuppressWarnings("rawtypes")
 38  0
 public abstract class WorkerContext
 39  
   extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
 40  
   implements Writable, WorkerIndexUsage<WritableComparable> {
 41  
   /** Global graph state */
 42  
   private GraphState graphState;
 43  
 
 44  
   /** Service worker */
 45  
   private CentralizedServiceWorker serviceWorker;
 46  
   /** All workers info */
 47  
   private AllWorkersInfo allWorkersInfo;
 48  
 
 49  
   /**
 50  
    * Set the graph state.
 51  
    *
 52  
    * @param graphState Used to set the graph state.
 53  
    */
 54  
   public final void setGraphState(GraphState graphState) {
 55  0
     this.graphState = graphState;
 56  0
   }
 57  
 
 58  
   /**
 59  
    * Setup superstep.
 60  
    *
 61  
    * @param serviceWorker Service worker containing all the information
 62  
    */
 63  
   public final void setupSuperstep(
 64  
       CentralizedServiceWorker<?, ?, ?> serviceWorker) {
 65  0
     this.serviceWorker = serviceWorker;
 66  0
     allWorkersInfo = new AllWorkersInfo(
 67  0
         serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
 68  0
   }
 69  
 
 70  
   /**
 71  
    * Initialize the WorkerContext.
 72  
    * This method is executed once on each Worker before the first
 73  
    * superstep starts.
 74  
    *
 75  
    * @throws IllegalAccessException Thrown for getting the class
 76  
    * @throws InstantiationException Expected instantiation in this method.
 77  
    */
 78  
   public abstract void preApplication() throws InstantiationException,
 79  
     IllegalAccessException;
 80  
 
 81  
   /**
 82  
    * Finalize the WorkerContext.
 83  
    * This method is executed once on each Worker after the last
 84  
    * superstep ends.
 85  
    */
 86  
   public abstract void postApplication();
 87  
 
 88  
   /**
 89  
    * Execute user code.
 90  
    * This method is executed once on each Worker before each
 91  
    * superstep starts.
 92  
    */
 93  
   public abstract void preSuperstep();
 94  
 
 95  
   /**
 96  
    * Get number of workers
 97  
    *
 98  
    * @return Number of workers
 99  
    */
 100  
   @Override
 101  
   public final int getWorkerCount() {
 102  0
     return allWorkersInfo.getWorkerCount();
 103  
   }
 104  
 
 105  
   /**
 106  
    * Get index for this worker
 107  
    *
 108  
    * @return Index of this worker
 109  
    */
 110  
   @Override
 111  
   public final int getMyWorkerIndex() {
 112  0
     return allWorkersInfo.getMyWorkerIndex();
 113  
   }
 114  
 
 115  
   @Override
 116  
   public final int getWorkerForVertex(WritableComparable vertexId) {
 117  0
     return allWorkersInfo.getWorkerIndex(
 118  0
         serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
 119  
   }
 120  
 
 121  
   /**
 122  
    * Get messages which other workers sent to this worker and clear them (can
 123  
    * be called once per superstep)
 124  
    *
 125  
    * @return Messages received
 126  
    */
 127  
   public final List<Writable> getAndClearMessagesFromOtherWorkers() {
 128  0
     return serviceWorker.getServerData().
 129  0
         getAndClearCurrentWorkerToWorkerMessages();
 130  
   }
 131  
 
 132  
   /**
 133  
    * Send message to another worker
 134  
    *
 135  
    * @param message Message to send
 136  
    * @param workerIndex Index of the worker to send the message to
 137  
    */
 138  
   public final void sendMessageToWorker(Writable message, int workerIndex) {
 139  0
     SendWorkerToWorkerMessageRequest request =
 140  
         new SendWorkerToWorkerMessageRequest(message);
 141  0
     if (workerIndex == getMyWorkerIndex()) {
 142  0
       request.doRequest(serviceWorker.getServerData());
 143  
     } else {
 144  0
       serviceWorker.getWorkerClient().sendWritableRequest(
 145  0
           allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), request);
 146  
     }
 147  0
   }
 148  
 
 149  
   /**
 150  
    * Execute user code.
 151  
    * This method is executed once on each Worker after each
 152  
    * superstep ends.
 153  
    */
 154  
   public abstract void postSuperstep();
 155  
 
 156  
   /**
 157  
    * Retrieves the current superstep.
 158  
    *
 159  
    * @return Current superstep
 160  
    */
 161  
   public final long getSuperstep() {
 162  0
     return graphState.getSuperstep();
 163  
   }
 164  
 
 165  
   /**
 166  
    * Get the total (all workers) number of vertices that
 167  
    * existed in the previous superstep.
 168  
    *
 169  
    * @return Total number of vertices (-1 if first superstep)
 170  
    */
 171  
   public final long getTotalNumVertices() {
 172  0
     return graphState.getTotalNumVertices();
 173  
   }
 174  
 
 175  
   /**
 176  
    * Get the total (all workers) number of edges that
 177  
    * existed in the previous superstep.
 178  
    *
 179  
    * @return Total number of edges (-1 if first superstep)
 180  
    */
 181  
   public final long getTotalNumEdges() {
 182  0
     return graphState.getTotalNumEdges();
 183  
   }
 184  
 
 185  
   /**
 186  
    * Get the mapper context
 187  
    *
 188  
    * @return Mapper context
 189  
    */
 190  
   public final Mapper.Context getContext() {
 191  0
     return graphState.getContext();
 192  
   }
 193  
 
 194  
   /**
 195  
    * Call this to log a line to command line of the job. Use in moderation -
 196  
    * it's a synchronous call to Job client
 197  
    *
 198  
    * @param line Line to print
 199  
    */
 200  
   public final void logToCommandLine(String line) {
 201  0
     serviceWorker.getJobProgressTracker().logInfo(line);
 202  0
   }
 203  
 
 204  
   @Override
 205  
   public void write(DataOutput dataOutput) throws IOException {
 206  0
   }
 207  
 
 208  
   @Override
 209  
   public void readFields(DataInput dataInput) throws IOException {
 210  0
   }
 211  
 }