Hama Graph Tutorial

This document describes the Graph computing framework and serves as a tutorial.

Overview

Hama includes the Graph package for vertex-centric graph computations. Hama's Graph package allows you to program Google's Pregel style applications with simple programming interface.

Vertex

Writing a Hama graph application involves subclassing the predefined Vertex class. Its template arguments define three value types, associated with vertices, edges, and messages.

  public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable>
      implements VertexInterface<V, E, M> {

    public void compute(Iterator<M> messages) throws IOException;
    ..

  }

The user overrides the Compute() method, which will be executed at each active vertex in every superstep. Predefined Vertex methods allow Compute() to query information about the current vertex and its edges, and to send messages to other vertices. Compute() can inspect the value associated with its vertex via GetValue().

Vertex Reader and Writer

Hama Graph provides very flexible input and output options, and allows to extract Vertex from your data without any pre-processing. You can create your own VertexReader for your data format by exending org.apache.hama.graph.VertexInputReader class. For example, an sequence file contains a linked list of Vertex, can be parse as following:

  public static class PagerankSeqReader
      extends
      VertexInputReader<Text, TextArrayWritable, Text, NullWritable, DoubleWritable> {
    @Override
    public boolean parseVertex(Text key, TextArrayWritable value,
        Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
      vertex.setVertexID(key);

      for (Writable v : value.get()) {
        vertex.addEdge(new Edge<Text, NullWritable>((Text) v, null));
      }

      return true;
    }
  }
And also, you can create your own Writer by implementing org.apache.hama.graph.VertexOutputWriter class. See the SemiClusterVertexOutputWriter example:
  @Override
  public void write(Vertex<V, E, M> vertex,
      BSPPeer<Writable, Writable, KEYOUT, VALUEOUT, GraphJobMessage> peer)
      throws IOException {
    SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue();
    peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue
        .getSemiClusterContainThis().toString()));
  }
  

Combiners

Sending a message to another vertex that exists on a different machine has some overhead. However if the algorithm doesn't require each message explicitly but a function of it (example sum) then combiners can be used.

Write your own Combiner

To write your own combiner, you have to extend Combiner class and implement the methods of #combine(Iterable<M> messages). For more, please see the implementation of MinIntCombiner in org.apache.hama.example.SSSP example.

Counters

Counters are used for measuring the progress or counting the number of events within job. For your own Counter, you need to define the enum type as follow:

  private static enum DYNAMIC_GRAPH_COUNTER {
    ADDED_VERTEX_COUNT,
    REMOVED_VERTEX_COUNT
  }

Then you can increment your own counter by calling increment method as follow:

  this.getCounter(DYNAMIC_GRAPH_COUNTER.ADDED_VERTEX_COUNT).increment(1);

Aggregators

Aggregators are a mechanism for global communication, monitoring, and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1.

Registering aggregators

To start using aggregators, you must declare them in your GraphJob:

  HamaConfiguration conf = new HamaConfiguration(new Configuration());
  GraphJob graphJob = new GraphJob(conf, MyClass.class);

  // To add an average aggregator
  graphJob.setAggregatorClass(AverageAggregator.class);

  // To add a sum aggregator
  graphJob.setAggregatorClass(SumAggregator.class);

There are multiple different aggregators and you can also make your own. You can look for already implemented aggregators in org.apache.hama.graph package.

Start working with aggregators

In order to aggregate values from your vertices, use:

  this.aggregate(index,value);

This method is called from inside each vertex. Though it's not mandatory all vertices to make use of this method. The index parameter of this method is a number that is equivalent to the order of the registered aggregator. (The first registered aggregator has index 0, second has index 1 etc.)

Get results

Inside your vertex, you can get the results of each aggregator by using the method:

  this.getAggregatedValue(index);

Write your own aggregators

To write your own aggregator, you have to extend org.apache.hama.graph.AbstractAggregator class and implement the methods of #aggregate(M value) and #getValue(). For more, please see the default implementation of aggregators in org.apache.hama.graph package.

Example: PageRankVertex

To solve the Page Rank problem using Hama Graph, you can extends the Vertex class to create a PageRankVertex class. In this example, the algorithm described Google's Pregel paper was used. The value of a vertex represents the tentative page rank of the vertex. The graph is intialized with each vertex value equal to 1/numOfVertices. In each of the first 30 supersteps, each vertex sends its tentative page rank along all of its outgoing edges.

From Superstep 1 to 30, each vertex sums up the values arriving on all its messages and sets its tentative page rank to (1 - 0.85) / numOfVertices + (0.85 * sum).

  public static class PageRankVertex extends
      Vertex<Text, NullWritable, DoubleWritable> {

    @Override
    public void compute(Iterator<DoubleWritable> messages) throws IOException {
      // initialize this vertex to 1 / count of global vertices in this graph
      if (this.getSuperstepCount() == 0) {
        setValue(new DoubleWritable(1.0 / this.getNumVertices()));
      } else if (this.getSuperstepCount() >= 1) {
        double sum = 0;
        for (DoubleWritable msg : messages) {
          sum += msg.get();
        }
        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
        setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
        aggregate(0, this.getValue());
      }

      // if we have not reached our global error yet, then proceed.
      DoubleWritable globalError = getAggregatedValue(0);
      
      if (globalError != null && this.getSuperstepCount() > 2
          && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
        voteToHalt();
      } else {
        // in each superstep we are going to send a new rank to our neighbours
        sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
            / this.getEdges().size()));
      }
    }
  }