Coverage Report - org.apache.giraph.io.formats.GraphvizOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GraphvizOutputFormat
0%
0/36
0%
0/4
1.13
GraphvizOutputFormat$1
N/A
N/A
1.13
GraphvizOutputFormat$GraphvizOutputCommitter
0%
0/24
N/A
1.13
GraphvizOutputFormat$VertexWriter
0%
0/8
0%
0/2
1.13
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.formats;
 19  
 
 20  
 import org.apache.giraph.edge.Edge;
 21  
 import org.apache.giraph.graph.Vertex;
 22  
 import org.apache.hadoop.fs.FSDataOutputStream;
 23  
 import org.apache.hadoop.fs.FileSystem;
 24  
 import org.apache.hadoop.fs.Path;
 25  
 import org.apache.hadoop.io.NullWritable;
 26  
 import org.apache.hadoop.io.Text;
 27  
 import org.apache.hadoop.io.Writable;
 28  
 import org.apache.hadoop.io.WritableComparable;
 29  
 import org.apache.hadoop.mapreduce.JobContext;
 30  
 import org.apache.hadoop.mapreduce.JobStatus;
 31  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 32  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 33  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 34  
 
 35  
 import java.io.IOException;
 36  
 
 37  
 /**
 38  
  * Writes graph to a dot file (graphviz format). This writes it out in parts. At
 39  
  * the end of the job you can use the following to get a single graphviz file:
 40  
  *
 41  
  * hadoop fs -getmerge /hadoop/output/path data.txt
 42  
  */
 43  0
 public class GraphvizOutputFormat extends TextVertexOutputFormat<
 44  
     WritableComparable, Writable, Writable> {
 45  
   /** Color of node text */
 46  
   private static final String NODE_TEXT_COLOR = "blue:orange";
 47  
 
 48  
   @Override
 49  
   public TextVertexWriter createVertexWriter(TaskAttemptContext context)
 50  
     throws IOException, InterruptedException {
 51  0
     return new VertexWriter();
 52  
   }
 53  
 
 54  
   @Override
 55  
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
 56  
     throws IOException, InterruptedException {
 57  0
     return new GraphvizOutputCommitter(super.getOutputCommitter(context));
 58  
   }
 59  
 
 60  
   /**
 61  
    * Get output directory job is using
 62  
    * @param context job context
 63  
    * @return Path for output directory
 64  
    */
 65  
 
 66  
   private static Path getOutputDir(JobContext context) {
 67  0
     return FileOutputFormat.getOutputPath(context);
 68  
   }
 69  
 
 70  
   /**
 71  
    * Get path which sorts at the beginning of a directory
 72  
    * @param context job context
 73  
    * @return Path at beginning
 74  
    */
 75  
 
 76  
   private static Path getPathAtBeginning(JobContext context) {
 77  0
     return new Path(getOutputDir(context), "____" + System.currentTimeMillis());
 78  
   }
 79  
 
 80  
   /**
 81  
    * Get path which sorts at the end of a directory
 82  
    * @param context job context
 83  
    * @return Path at end
 84  
    */
 85  
   private static Path getPathAtEnd(JobContext context) {
 86  0
     return new Path(getOutputDir(context), "zzz_" + System.currentTimeMillis());
 87  
   }
 88  
 
 89  
   /**
 90  
    * Write start of graph data
 91  
    * @param context task attempt context
 92  
    * @throws IOException I/O errors
 93  
    */
 94  
   private static void writeStart(JobContext context) throws IOException {
 95  0
     Path path = getPathAtBeginning(context);
 96  0
     FileSystem fs = path.getFileSystem(context.getConfiguration());
 97  0
     FSDataOutputStream file = fs.create(path, false);
 98  0
     file.writeBytes("digraph g {\n");
 99  0
     file.close();
 100  0
   }
 101  
 
 102  
   /**
 103  
    * Write end of graph data
 104  
    * @param context job attempt context
 105  
    * @throws IOException I/O errors
 106  
    */
 107  
   private static void writeEnd(JobContext context) throws IOException {
 108  0
     Path path = getPathAtEnd(context);
 109  0
     FileSystem fs = path.getFileSystem(context.getConfiguration());
 110  0
     FSDataOutputStream file = fs.create(path, false);
 111  0
     file.writeBytes("}\n");
 112  0
     file.close();
 113  0
   }
 114  
 
 115  
   /**
 116  
    * Add node information to output
 117  
    * @param vertex node
 118  
    * @param sb string builder
 119  
    */
 120  
   private static void addNodeInfo(
 121  
       Vertex<WritableComparable, Writable, Writable> vertex, StringBuilder sb) {
 122  0
     sb.append('"').append(vertex.getId()).append('"');
 123  0
     sb.append(" [").append("label=").append('"').append("<id> ");
 124  0
     sb.append(vertex.getId());
 125  0
     if (!(vertex.getValue() instanceof NullWritable)) {
 126  0
       sb.append("|").append(vertex.getValue());
 127  
     }
 128  0
     sb.append('"').append(",shape=record,fillcolor=")
 129  0
         .append('"').append(NODE_TEXT_COLOR).append('"')
 130  0
         .append("];");
 131  0
   }
 132  
 
 133  
   /**
 134  
    * Write an edge
 135  
    * @param sb string builder
 136  
    * @param sourceID source vertex ID
 137  
    * @param edge the edge
 138  
    */
 139  
   private static void addEdge(StringBuilder sb, Writable sourceID,
 140  
       Edge<WritableComparable, Writable> edge) {
 141  0
     sb.append(sourceID).append(":id")
 142  0
         .append(" -> ")
 143  0
         .append(edge.getTargetVertexId()).append(":id");
 144  0
     addEdgeInfo(sb, edge);
 145  0
     sb.append("\n");
 146  0
   }
 147  
 
 148  
   /**
 149  
    * Add edge information to output
 150  
    * @param sb string builder
 151  
    * @param edge the edge
 152  
    */
 153  
   private static void addEdgeInfo(StringBuilder sb,
 154  
     Edge<WritableComparable, Writable> edge) {
 155  0
     if (!(edge.getValue() instanceof NullWritable)) {
 156  0
       sb.append(" [label=").append(edge.getValue()).append(" ];");
 157  
     }
 158  0
   }
 159  
 
 160  
   /**
 161  
    * Wrapper around output committer which writes our begin/end files.
 162  
    */
 163  0
   private static class GraphvizOutputCommitter extends OutputCommitter {
 164  
     /** delegate committer */
 165  
     private final OutputCommitter delegate;
 166  
 
 167  
     /**
 168  
      * Constructor with delegate
 169  
      * @param delegate committer to use
 170  
      */
 171  0
     private GraphvizOutputCommitter(OutputCommitter delegate) {
 172  0
       this.delegate = delegate;
 173  0
     }
 174  
 
 175  
     @Override public boolean equals(Object o) {
 176  0
       return delegate.equals(o);
 177  
     }
 178  
 
 179  
     @Override public String toString() {
 180  0
       return delegate.toString();
 181  
     }
 182  
 
 183  
     @Override public int hashCode() {
 184  0
       return delegate.hashCode();
 185  
     }
 186  
 
 187  
     @Override public void abortJob(JobContext jobContext, JobStatus.State state)
 188  
       throws IOException {
 189  0
       delegate.abortJob(jobContext, state);
 190  0
     }
 191  
 
 192  
     @Override public void abortTask(TaskAttemptContext taskContext)
 193  
       throws IOException {
 194  0
       delegate.abortTask(taskContext);
 195  0
     }
 196  
 
 197  
     @Override @Deprecated public void cleanupJob(JobContext context)
 198  
       throws IOException {
 199  0
       delegate.cleanupJob(context);
 200  0
     }
 201  
 
 202  
     @Override public void commitJob(JobContext jobContext) throws IOException {
 203  0
       writeEnd(jobContext);
 204  0
       delegate.commitJob(jobContext);
 205  0
     }
 206  
 
 207  
     @Override public void commitTask(TaskAttemptContext taskContext)
 208  
       throws IOException {
 209  0
       delegate.commitTask(taskContext);
 210  0
     }
 211  
 
 212  
     @Override public boolean needsTaskCommit(TaskAttemptContext taskContext)
 213  
       throws IOException {
 214  0
       return delegate.needsTaskCommit(taskContext);
 215  
     }
 216  
 
 217  
     @Override public void setupJob(JobContext jobContext) throws IOException {
 218  0
       delegate.setupJob(jobContext);
 219  0
       writeStart(jobContext);
 220  0
     }
 221  
 
 222  
     @Override public void setupTask(TaskAttemptContext taskContext)
 223  
       throws IOException {
 224  0
       delegate.setupTask(taskContext);
 225  0
     }
 226  
   }
 227  
 
 228  
   /**
 229  
    * Writes vertices to graphviz files.
 230  
    */
 231  0
   private class VertexWriter extends TextVertexWriter {
 232  
     @Override
 233  
     public void writeVertex(
 234  
       Vertex<WritableComparable, Writable, Writable> vertex)
 235  
       throws IOException, InterruptedException {
 236  0
       StringBuilder sb = new StringBuilder(vertex.getNumEdges() * 10);
 237  0
       for (Edge<WritableComparable, Writable> edge : vertex.getEdges()) {
 238  0
         addEdge(sb, vertex.getId(), edge);
 239  0
       }
 240  0
       addNodeInfo(vertex, sb);
 241  0
       getRecordWriter().write(new Text(sb.toString()), null);
 242  0
     }
 243  
   }
 244  
 }