Coverage Report - org.apache.giraph.io.formats.JsonBase64VertexInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
JsonBase64VertexInputFormat
0%
0/3
N/A
3.333
JsonBase64VertexInputFormat$JsonBase64VertexReader
0%
0/41
0%
0/2
3.333
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import com.google.common.collect.Lists;
 22  
 import net.iharder.Base64;
 23  
 import org.apache.giraph.edge.Edge;
 24  
 import org.apache.giraph.edge.EdgeFactory;
 25  
 import org.apache.hadoop.io.Text;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.InputSplit;
 29  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 30  
 import org.json.JSONArray;
 31  
 import org.json.JSONException;
 32  
 import org.json.JSONObject;
 33  
 
 34  
 import java.io.ByteArrayInputStream;
 35  
 import java.io.DataInput;
 36  
 import java.io.DataInputStream;
 37  
 import java.io.IOException;
 38  
 import java.util.List;
 39  
 
 40  
 /**
 41  
  * Simple way to represent the structure of the graph with a JSON object.
 42  
  * The actual vertex ids, values, edges are stored by the
 43  
  * Writable serialized bytes that are Byte64 encoded.
 44  
  * Works with {@link JsonBase64VertexOutputFormat}
 45  
  *
 46  
  * @param <I> Vertex index value
 47  
  * @param <V> Vertex value
 48  
  * @param <E> Edge value
 49  
  */
 50  0
 @SuppressWarnings("rawtypes")
 51  0
 public class JsonBase64VertexInputFormat<I extends WritableComparable,
 52  
     V extends Writable, E extends Writable>
 53  
     extends TextVertexInputFormat<I, V, E> {
 54  
 
 55  
   @Override
 56  
   public TextVertexReader createVertexReader(InputSplit split,
 57  
       TaskAttemptContext context) {
 58  0
     return new JsonBase64VertexReader();
 59  
   }
 60  
 
 61  
   /**
 62  
    * Simple reader that supports {@link JsonBase64VertexInputFormat}
 63  
    */
 64  0
   protected class JsonBase64VertexReader extends
 65  
     TextVertexReaderFromEachLineProcessed<JSONObject> {
 66  
 
 67  
 
 68  
     @Override
 69  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 70  
       throws IOException, InterruptedException {
 71  0
       super.initialize(inputSplit, context);
 72  0
     }
 73  
 
 74  
     @Override
 75  
     protected JSONObject preprocessLine(Text line) {
 76  
       try {
 77  0
         return new JSONObject(line.toString());
 78  0
       } catch (JSONException e) {
 79  0
         throw new IllegalArgumentException(
 80  
           "next: Failed to get the vertex", e);
 81  
       }
 82  
     }
 83  
 
 84  
     @Override
 85  
     protected I getId(JSONObject vertexObject) throws IOException {
 86  
       try {
 87  0
         byte[] decodedWritable = Base64.decode(
 88  0
             vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
 89  0
         DataInput input = new DataInputStream(
 90  
             new ByteArrayInputStream(decodedWritable));
 91  0
         I vertexId = getConf().createVertexId();
 92  0
         vertexId.readFields(input);
 93  0
         return vertexId;
 94  0
       } catch (JSONException e) {
 95  0
         throw new IllegalArgumentException(
 96  
           "next: Failed to get vertex id", e);
 97  
       }
 98  
     }
 99  
 
 100  
     @Override
 101  
     protected V getValue(JSONObject vertexObject) throws IOException {
 102  
       try {
 103  0
         byte[] decodedWritable = Base64.decode(
 104  0
             vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
 105  0
         DataInputStream input = new DataInputStream(
 106  
             new ByteArrayInputStream(decodedWritable));
 107  0
         V vertexValue = getConf().createVertexValue();
 108  0
         vertexValue.readFields(input);
 109  0
         return vertexValue;
 110  0
       } catch (JSONException e) {
 111  0
         throw new IllegalArgumentException(
 112  
           "next: Failed to get vertex value", e);
 113  
       }
 114  
     }
 115  
 
 116  
     @Override
 117  
     protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
 118  
     IOException {
 119  0
       JSONArray edgeArray = null;
 120  
       try {
 121  0
         edgeArray = vertexObject.getJSONArray(
 122  
           JsonBase64VertexFormat.EDGE_ARRAY_KEY);
 123  0
       } catch (JSONException e) {
 124  0
         throw new IllegalArgumentException(
 125  
           "next: Failed to get edge array", e);
 126  0
       }
 127  
       byte[] decodedWritable;
 128  0
       List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
 129  0
           edgeArray.length());
 130  0
       for (int i = 0; i < edgeArray.length(); ++i) {
 131  
         try {
 132  0
           decodedWritable = Base64.decode(edgeArray.getString(i));
 133  0
         } catch (JSONException e) {
 134  0
           throw new IllegalArgumentException(
 135  
             "next: Failed to get edge value", e);
 136  0
         }
 137  0
         DataInputStream input = new DataInputStream(
 138  
             new ByteArrayInputStream(decodedWritable));
 139  0
         I targetVertexId = getConf().createVertexId();
 140  0
         targetVertexId.readFields(input);
 141  0
         E edgeValue = getConf().createEdgeValue();
 142  0
         edgeValue.readFields(input);
 143  0
         edges.add(EdgeFactory.create(targetVertexId, edgeValue));
 144  
       }
 145  0
       return edges;
 146  
     }
 147  
 
 148  
   }
 149  
 
 150  
 }