Coverage Report - org.apache.giraph.block_app.framework.output.BlockOutputHandle
 
Classes in this File Line Coverage Branch Coverage Complexity
BlockOutputHandle
0%
0/46
0%
0/16
0
BlockOutputHandle$1
0%
0/2
N/A
0
BlockOutputHandle$1$1
0%
0/6
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.output;
 19  
 
 20  
 import java.util.HashMap;
 21  
 import java.util.Map;
 22  
 import java.util.Queue;
 23  
 import java.util.concurrent.Callable;
 24  
 import java.util.concurrent.ConcurrentLinkedQueue;
 25  
 
 26  
 import org.apache.giraph.block_app.framework.api.BlockOutputApi;
 27  
 import org.apache.giraph.conf.GiraphConstants;
 28  
 import org.apache.giraph.utils.CallableFactory;
 29  
 import org.apache.giraph.utils.ProgressableUtils;
 30  
 import org.apache.hadoop.conf.Configuration;
 31  
 import org.apache.hadoop.util.Progressable;
 32  
 
 33  
 /**
 34  
  * Handler for blocks output - keeps track of outputs and writers created
 35  
  */
 36  
 @SuppressWarnings("unchecked")
 37  
 public class BlockOutputHandle implements BlockOutputApi {
 38  
   private transient Configuration conf;
 39  
   private transient Progressable progressable;
 40  
   private final Map<String, BlockOutputDesc> outputDescMap;
 41  0
   private final Map<String, Queue<BlockOutputWriter>> freeWriters =
 42  
       new HashMap<>();
 43  0
   private final Map<String, Queue<BlockOutputWriter>> occupiedWriters =
 44  
       new HashMap<>();
 45  
 
 46  0
   public BlockOutputHandle() {
 47  0
     outputDescMap = null;
 48  0
   }
 49  
 
 50  
   public BlockOutputHandle(String jobIdentifier, Configuration conf,
 51  0
       Progressable hadoopProgressable) {
 52  0
     outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
 53  
         conf, jobIdentifier);
 54  0
     for (Map.Entry<String, BlockOutputDesc> entry : outputDescMap.entrySet()) {
 55  0
       entry.getValue().preWriting();
 56  0
       freeWriters.put(entry.getKey(),
 57  
           new ConcurrentLinkedQueue<BlockOutputWriter>());
 58  0
       occupiedWriters.put(entry.getKey(),
 59  
           new ConcurrentLinkedQueue<BlockOutputWriter>());
 60  0
     }
 61  0
     initialize(conf, hadoopProgressable);
 62  0
   }
 63  
 
 64  
   public void initialize(Configuration conf, Progressable progressable) {
 65  0
     this.conf = conf;
 66  0
     this.progressable = progressable;
 67  0
   }
 68  
 
 69  
 
 70  
   @Override
 71  
   public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
 72  
   OD getOutputDesc(String confOption) {
 73  0
     if (outputDescMap == null) {
 74  0
       throw new IllegalArgumentException(
 75  
           "Output cannot be used with checkpointing");
 76  
     }
 77  0
     return (OD) outputDescMap.get(confOption);
 78  
   }
 79  
 
 80  
   @Override
 81  
   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
 82  0
     if (outputDescMap == null) {
 83  0
       throw new IllegalArgumentException(
 84  
           "Output cannot be used with checkpointing");
 85  
     }
 86  0
     OW outputWriter = (OW) freeWriters.get(confOption).poll();
 87  0
     if (outputWriter == null) {
 88  0
       outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
 89  
           conf, progressable);
 90  
     }
 91  0
     occupiedWriters.get(confOption).add(outputWriter);
 92  0
     return outputWriter;
 93  
   }
 94  
 
 95  
   public void returnAllWriters() {
 96  
     for (Map.Entry<String, Queue<BlockOutputWriter>> entry :
 97  0
         occupiedWriters.entrySet()) {
 98  0
       freeWriters.get(entry.getKey()).addAll(entry.getValue());
 99  0
       entry.getValue().clear();
 100  0
     }
 101  0
   }
 102  
 
 103  
   public void closeAllWriters() {
 104  0
     final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>();
 105  0
     for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) {
 106  0
       allWriters.addAll(blockOutputWriters);
 107  0
     }
 108  0
     if (allWriters.isEmpty()) {
 109  0
       return;
 110  
     }
 111  
     // Closing writers can take time - use multiple threads and call progress
 112  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 113  
       @Override
 114  
       public Callable<Void> newCallable(int callableId) {
 115  0
         return new Callable<Void>() {
 116  
           @Override
 117  
           public Void call() throws Exception {
 118  0
             BlockOutputWriter writer = allWriters.poll();
 119  0
             while (writer != null) {
 120  0
               writer.close();
 121  0
               writer = allWriters.poll();
 122  
             }
 123  0
             return null;
 124  
           }
 125  
         };
 126  
       }
 127  
     };
 128  0
     ProgressableUtils.getResultsWithNCallables(callableFactory,
 129  0
         Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
 130  0
             allWriters.size()), "close-writers-%d", progressable);
 131  
     // Close all output formats
 132  0
     for (BlockOutputDesc outputDesc : outputDescMap.values()) {
 133  0
       outputDesc.postWriting();
 134  0
     }
 135  0
   }
 136  
 }