Coverage Report - org.apache.giraph.lib.JsonBase64VertexInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
JsonBase64VertexInputFormat
0%
0/3
N/A
3.75
JsonBase64VertexInputFormat$JsonBase64VertexReader
0%
0/46
0%
0/2
3.75
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.lib;
 20  
 
 21  
 import com.google.common.collect.Maps;
 22  
 import net.iharder.Base64;
 23  
 import org.apache.giraph.graph.BasicVertex;
 24  
 import org.apache.giraph.graph.BspUtils;
 25  
 import org.apache.giraph.graph.Edge;
 26  
 import org.apache.giraph.graph.VertexReader;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 import org.apache.hadoop.io.LongWritable;
 29  
 import org.apache.hadoop.io.Text;
 30  
 import org.apache.hadoop.io.Writable;
 31  
 import org.apache.hadoop.io.WritableComparable;
 32  
 import org.apache.hadoop.mapreduce.InputSplit;
 33  
 import org.apache.hadoop.mapreduce.RecordReader;
 34  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 35  
 import org.json.JSONArray;
 36  
 import org.json.JSONException;
 37  
 import org.json.JSONObject;
 38  
 
 39  
 import java.io.ByteArrayInputStream;
 40  
 import java.io.DataInput;
 41  
 import java.io.DataInputStream;
 42  
 import java.io.IOException;
 43  
 import java.util.Map;
 44  
 
 45  
 /**
 46  
  * Simple way to represent the structure of the graph with a JSON object.
 47  
  * The actual vertex ids, values, edges are stored by the
 48  
  * Writable serialized bytes that are Byte64 encoded.
 49  
  * Works with {@link JsonBase64VertexOutputFormat}
 50  
  *
 51  
  * @param <I> Vertex index value
 52  
  * @param <V> Vertex value
 53  
  * @param <E> Edge value
 54  
  * @param <M> Message data
 55  
  */
 56  
 @SuppressWarnings("rawtypes")
 57  0
 public class JsonBase64VertexInputFormat<I extends WritableComparable,
 58  
     V extends Writable, E extends Writable, M extends Writable>
 59  
     extends TextVertexInputFormat<I, V, E, M> {
 60  
   /**
 61  
    * Simple reader that supports {@link JsonBase64VertexInputFormat}
 62  
    *
 63  
    * @param <I> Vertex index value
 64  
    * @param <V> Vertex value
 65  
    * @param <E> Edge value
 66  
    * @param <M> Message data
 67  
    */
 68  0
   private static class JsonBase64VertexReader<I extends WritableComparable,
 69  
       V extends Writable, E extends Writable, M extends Writable>
 70  
       extends TextVertexReader<I, V, E, M> {
 71  
     /**
 72  
      * Only constructor.  Requires the LineRecordReader
 73  
      *
 74  
      * @param lineRecordReader Line record reader to read from
 75  
      */
 76  
     public JsonBase64VertexReader(
 77  
         RecordReader<LongWritable, Text> lineRecordReader) {
 78  0
       super(lineRecordReader);
 79  0
     }
 80  
 
 81  
     @Override
 82  
     public boolean nextVertex() throws IOException, InterruptedException {
 83  0
       return getRecordReader().nextKeyValue();
 84  
     }
 85  
 
 86  
     @Override
 87  
     public BasicVertex<I, V, E, M> getCurrentVertex()
 88  
       throws IOException, InterruptedException {
 89  0
       Configuration conf = getContext().getConfiguration();
 90  0
       BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
 91  
 
 92  0
       Text line = getRecordReader().getCurrentValue();
 93  
       JSONObject vertexObject;
 94  
       try {
 95  0
         vertexObject = new JSONObject(line.toString());
 96  0
       } catch (JSONException e) {
 97  0
         throw new IllegalArgumentException(
 98  
           "next: Failed to get the vertex", e);
 99  0
       }
 100  0
       DataInput input = null;
 101  0
       byte[] decodedWritable = null;
 102  0
       I vertexId = null;
 103  
       try {
 104  0
         decodedWritable = Base64.decode(
 105  
           vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
 106  0
         input = new DataInputStream(
 107  
           new ByteArrayInputStream(decodedWritable));
 108  0
         vertexId = BspUtils.<I>createVertexIndex(conf);
 109  0
         vertexId.readFields(input);
 110  0
       } catch (JSONException e) {
 111  0
         throw new IllegalArgumentException(
 112  
           "next: Failed to get vertex id", e);
 113  0
       }
 114  0
       V vertexValue = null;
 115  
       try {
 116  0
         decodedWritable = Base64.decode(
 117  
           vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
 118  0
         input = new DataInputStream(
 119  
           new ByteArrayInputStream(decodedWritable));
 120  0
         vertexValue = BspUtils.<V>createVertexValue(conf);
 121  0
         vertexValue.readFields(input);
 122  0
       } catch (JSONException e) {
 123  0
         throw new IllegalArgumentException(
 124  
           "next: Failed to get vertex value", e);
 125  0
       }
 126  0
       JSONArray edgeArray = null;
 127  
       try {
 128  0
         edgeArray = vertexObject.getJSONArray(
 129  
           JsonBase64VertexFormat.EDGE_ARRAY_KEY);
 130  0
       } catch (JSONException e) {
 131  0
         throw new IllegalArgumentException(
 132  
           "next: Failed to get edge array", e);
 133  0
       }
 134  0
       Map<I, E> edgeMap = Maps.newHashMap();
 135  0
       for (int i = 0; i < edgeArray.length(); ++i) {
 136  
         try {
 137  0
           decodedWritable = Base64.decode(edgeArray.getString(i));
 138  0
         } catch (JSONException e) {
 139  0
           throw new IllegalArgumentException(
 140  
             "next: Failed to get edge value", e);
 141  0
         }
 142  0
         input = new DataInputStream(
 143  
             new ByteArrayInputStream(decodedWritable));
 144  0
         Edge<I, E> edge = new Edge<I, E>();
 145  0
         edge.setConf(getContext().getConfiguration());
 146  0
         edge.readFields(input);
 147  0
         edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
 148  
       }
 149  0
       vertex.initialize(vertexId, vertexValue, edgeMap, null);
 150  0
       return vertex;
 151  
     }
 152  
   }
 153  
 
 154  
   @Override
 155  
   public VertexReader<I, V, E, M> createVertexReader(
 156  
       InputSplit split, TaskAttemptContext context) throws IOException {
 157  0
     return new JsonBase64VertexReader<I, V, E, M>(
 158  
       textInputFormat.createRecordReader(split, context));
 159  
   }
 160  
 }