Coverage Report - org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerRequestReservedMap
0%
0/15
0%
0/4
1.667
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty.handler;
 20  
 
 21  
 import org.apache.giraph.comm.netty.NettyServer;
 22  
 import org.apache.giraph.conf.GiraphConstants;
 23  
 import org.apache.giraph.utils.IncreasingBitSet;
 24  
 import org.apache.hadoop.conf.Configuration;
 25  
 
 26  
 import com.google.common.collect.MapMaker;
 27  
 
 28  
 import java.util.concurrent.ConcurrentMap;
 29  
 
 30  
 /**
 31  
  * Provides a thread-safe map for checking worker and request id pairs
 32  
  */
 33  
 public class WorkerRequestReservedMap {
 34  
   /** Map of the worker ids to the requests received (bit set) */
 35  
   private final ConcurrentMap<Integer, IncreasingBitSet>
 36  
   workerRequestReservedMap;
 37  
 
 38  
   /**
 39  
    * Constructor
 40  
    *
 41  
    * @param conf Configuration
 42  
    */
 43  0
   public WorkerRequestReservedMap(Configuration conf) {
 44  0
     workerRequestReservedMap = new MapMaker().concurrencyLevel(
 45  0
         conf.getInt(GiraphConstants.MSG_NUM_FLUSH_THREADS,
 46  0
             NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT)).makeMap();
 47  0
   }
 48  
 
 49  
   /**
 50  
    * Reserve the request (before the request starts to insure that it is
 51  
    * only executed once).  We are assuming no failure on the server.
 52  
    *
 53  
    * @param workerId workerId of the request
 54  
    * @param requestId Request id
 55  
    * @return True if the reserving succeeded, false otherwise
 56  
    */
 57  
   public boolean reserveRequest(Integer workerId, long requestId) {
 58  0
     IncreasingBitSet requestSet = getRequestSet(workerId);
 59  0
     return requestSet.add(requestId);
 60  
   }
 61  
 
 62  
   /**
 63  
    * Get and create the entry as necessary to get the request bit set.
 64  
    *
 65  
    * @param workerId Id of the worker to get the bit set for
 66  
    * @return Bit set for the worker
 67  
    */
 68  
   private IncreasingBitSet getRequestSet(Integer workerId) {
 69  0
     IncreasingBitSet requestSet = workerRequestReservedMap.get(workerId);
 70  0
     if (requestSet == null) {
 71  0
       requestSet = new IncreasingBitSet();
 72  0
       IncreasingBitSet previous =
 73  0
           workerRequestReservedMap.putIfAbsent(workerId, requestSet);
 74  0
       if (previous != null) {
 75  0
         requestSet = previous;
 76  
       }
 77  
     }
 78  0
     return requestSet;
 79  
   }
 80  
 }