This document describes the Graph computing framework and serves as a tutorial.
Hama includes the Graph package for vertex-centric graph computations. Hama's Graph package allows you to program Google's Pregel style applications with simple programming interface.
Writing a Hama graph application involves subclassing the predefined Vertex class. Its template arguments define three value types, associated with vertices, edges, and messages.
public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable> implements VertexInterface<V, E, M> { public void compute(Iterator<M> messages) throws IOException; .. }
The user overrides the Compute() method, which will be executed at each active vertex in every superstep. Predefined Vertex methods allow Compute() to query information about the current vertex and its edges, and to send messages to other vertices. Compute() can inspect the value associated with its vertex via GetValue().
Hama Graph provides very flexible input and output options, and allows to extract Vertex from your data without any pre-processing. You can create your own VertexReader for your data format by exending org.apache.hama.graph.VertexInputReader class. For example, an sequence file contains a linked list of Vertex, can be parse as following:
public static class PagerankSeqReader extends VertexInputReader<Text, TextArrayWritable, Text, NullWritable, DoubleWritable> { @Override public boolean parseVertex(Text key, TextArrayWritable value, Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception { vertex.setVertexID(key); for (Writable v : value.get()) { vertex.addEdge(new Edge<Text, NullWritable>((Text) v, null)); } return true; } }
@Override public void write(Vertex<V, E, M> vertex, BSPPeer<Writable, Writable, KEYOUT, VALUEOUT, GraphJobMessage> peer) throws IOException { SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue(); peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue .getSemiClusterContainThis().toString())); }
Sending a message to another vertex that exists on a different machine has some overhead. However if the algorithm doesn't require each message explicitly but a function of it (example sum) then combiners can be used.
Counters are used for measuring the progress or counting the number of events within job. For your own Counter, you need to define the enum type as follow:
private static enum DYNAMIC_GRAPH_COUNTER { ADDED_VERTEX_COUNT, REMOVED_VERTEX_COUNT }
Then you can increment your own counter by calling increment method as follow:
this.getCounter(DYNAMIC_GRAPH_COUNTER.ADDED_VERTEX_COUNT).increment(1);
Aggregators are a mechanism for global communication, monitoring, and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1.
To start using aggregators, you must declare them in your GraphJob:
HamaConfiguration conf = new HamaConfiguration(new Configuration()); GraphJob graphJob = new GraphJob(conf, MyClass.class); // To add an average aggregator graphJob.setAggregatorClass(AverageAggregator.class); // To add a sum aggregator graphJob.setAggregatorClass(SumAggregator.class);
There are multiple different aggregators and you can also make your own. You can look for already implemented aggregators in org.apache.hama.graph package.
In order to aggregate values from your vertices, use:
this.aggregate(index,value);
This method is called from inside each vertex. Though it's not mandatory all vertices to make use of this method. The index parameter of this method is a number that is equivalent to the order of the registered aggregator. (The first registered aggregator has index 0, second has index 1 etc.)
To solve the Page Rank problem using Hama Graph, you can extends the Vertex class to create a PageRankVertex class.
In this example, the algorithm described Google's Pregel paper was used. The value of a vertex represents the tentative page rank of the vertex. The graph is intialized with each vertex value equal to 1/numOfVertices. In each of the first 30 supersteps, each vertex sends its tentative page rank along all of its outgoing edges.
From Superstep 1 to 30, each vertex sums up the values arriving on all its messages and sets its tentative page rank to (1 - 0.85) / numOfVertices + (0.85 * sum).
public static class PageRankVertex extends Vertex<Text, NullWritable, DoubleWritable> { @Override public void compute(Iterator<DoubleWritable> messages) throws IOException { // initialize this vertex to 1 / count of global vertices in this graph if (this.getSuperstepCount() == 0) { setValue(new DoubleWritable(1.0 / this.getNumVertices())); } else if (this.getSuperstepCount() >= 1) { double sum = 0; for (DoubleWritable msg : messages) { sum += msg.get(); } double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR))); aggregate(0, this.getValue()); } // if we have not reached our global error yet, then proceed. DoubleWritable globalError = getAggregatedValue(0); if (globalError != null && this.getSuperstepCount() > 2 && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { voteToHalt(); } else { // in each superstep we are going to send a new rank to our neighbours sendMessageToNeighbors(new DoubleWritable(this.getValue().get() / this.getEdges().size())); } } }