Coverage Report - org.apache.giraph.io.accumulo.AccumuloVertexOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
AccumuloVertexOutputFormat
0%
0/9
0%
0/2
1.429
AccumuloVertexOutputFormat$AccumuloVertexWriter
0%
0/9
N/A
1.429
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.accumulo;
 19  
 
 20  
 import java.io.IOException;
 21  
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 22  
 import org.apache.accumulo.core.data.Mutation;
 23  
 import org.apache.giraph.io.VertexOutputFormat;
 24  
 import org.apache.giraph.io.VertexWriter;
 25  
 import org.apache.hadoop.io.Text;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.JobContext;
 29  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 30  
 import org.apache.hadoop.mapreduce.RecordWriter;
 31  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 32  
 /**
 33  
  *
 34  
  *  Class which wraps the AccumuloOutputFormat. It's designed
 35  
  *  as an extension point to VertexOutputFormat subclasses who wish
 36  
  *  to write vertices back to an Accumulo table.
 37  
  *
 38  
  *  Works with
 39  
  *  {@link AccumuloVertexInputFormat}
 40  
  *
 41  
  *
 42  
  * @param <I> vertex id type
 43  
  * @param <V>  vertex value type
 44  
  * @param <E>  edge type
 45  
  */
 46  0
 public abstract class AccumuloVertexOutputFormat<
 47  
         I extends WritableComparable,
 48  
         V extends Writable,
 49  
         E extends Writable>
 50  
         extends VertexOutputFormat<I, V, E> {
 51  
 
 52  
 
 53  
   /**
 54  
    * Output table parameter
 55  
    */
 56  
   public static final String OUTPUT_TABLE = "OUTPUT_TABLE";
 57  
 
 58  
   /**
 59  
    * Accumulo delegate for table output
 60  
    */
 61  0
   protected AccumuloOutputFormat accumuloOutputFormat =
 62  
           new AccumuloOutputFormat();
 63  
 
 64  
   /**
 65  
    *
 66  
    * Main abstraction point for vertex writers to persist back
 67  
    * to Accumulo tables.
 68  
    *
 69  
    * @param <I> vertex id type
 70  
    * @param <V> vertex value type
 71  
    * @param <E>  edge type
 72  
    */
 73  
   public abstract static class AccumuloVertexWriter<
 74  
       I extends WritableComparable,
 75  
       V extends Writable,
 76  
       E extends Writable>
 77  
       extends VertexWriter<I, V, E> {
 78  
 
 79  
     /**
 80  
      * task attempt context.
 81  
      */
 82  
     private TaskAttemptContext context;
 83  
 
 84  
     /**
 85  
      * Accumulo record writer
 86  
      */
 87  
     private RecordWriter<Text, Mutation> recordWriter;
 88  
 
 89  
     /**
 90  
      * Constructor for use with subclasses
 91  
      *
 92  
      * @param recordWriter accumulo record writer
 93  
      */
 94  0
     public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
 95  0
       this.recordWriter = recordWriter;
 96  0
     }
 97  
 
 98  
     /**
 99  
      * initialize
 100  
      *
 101  
      * @param context Context used to write the vertices.
 102  
      * @throws IOException
 103  
      */
 104  
     public void initialize(TaskAttemptContext context) throws IOException {
 105  0
       this.context = context;
 106  0
     }
 107  
 
 108  
     /**
 109  
      *  close
 110  
      *
 111  
      * @param context the context of the task
 112  
      * @throws IOException
 113  
      * @throws InterruptedException
 114  
      */
 115  
     public void close(TaskAttemptContext context)
 116  
       throws IOException, InterruptedException {
 117  0
       recordWriter.close(context);
 118  0
     }
 119  
 
 120  
     /**
 121  
      * Get the table record writer;
 122  
      *
 123  
      * @return Record writer to be used for writing.
 124  
      */
 125  
     public RecordWriter<Text, Mutation> getRecordWriter() {
 126  0
       return recordWriter;
 127  
     }
 128  
 
 129  
     /**
 130  
      * Get the context.
 131  
      *
 132  
      * @return Context passed to initialize.
 133  
      */
 134  
     public TaskAttemptContext getContext() {
 135  0
       return context;
 136  
     }
 137  
 
 138  
   }
 139  
   /**
 140  
    *
 141  
    * checkOutputSpecs
 142  
    *
 143  
    * @param context information about the job
 144  
    * @throws IOException
 145  
    * @throws InterruptedException
 146  
    */
 147  
   @Override
 148  
   public void checkOutputSpecs(JobContext context)
 149  
     throws IOException, InterruptedException {
 150  
     try {
 151  0
       accumuloOutputFormat.checkOutputSpecs(context);
 152  0
     } catch (IOException e) {
 153  0
       if (e.getMessage().contains("Output info has not been set")) {
 154  0
         throw new IOException(e.getMessage() + " Make sure you initialized" +
 155  
                 " AccumuloOutputFormat static setters " +
 156  
                 "before passing the config to GiraphJob.");
 157  
       }
 158  0
     }
 159  0
   }
 160  
 
 161  
   /**
 162  
    * getOutputCommitter
 163  
    *
 164  
    * @param context the task context
 165  
    * @return OutputCommitter
 166  
    * @throws IOException
 167  
    * @throws InterruptedException
 168  
    */
 169  
   @Override
 170  
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
 171  
     throws IOException, InterruptedException {
 172  0
     return accumuloOutputFormat.getOutputCommitter(context);
 173  
   }
 174  
 }