Hama BSP Tutorial

This document describes the Hama BSP framework and serves as a tutorial.

Overview

Hama provides a Pure BSP Bulk Synchronous Parallel Model for message passing and collective communication. A BSP program consists of a sequence of supersteps. Each superstep consists of the following three phases:

BSP programming enables you to write high-performance parallel computing algorithms for a wide range of scientific problems.

Create your own BSP by extending BSP class

The way to create your own BSP class is to create a class that extends the org.apache.hama.bsp.BSP class.
The extending class must override the bsp() method, which is declared like this:

  public abstract void bsp(BSPPeer<K1, V1, K2, V2, M extends Writable> peer) throws IOException, 
    SyncException, InterruptedException;

You will define the BSP program inside this bsp() method. It is important to understand that it doesn't mean a single superstep. As described above, a BSP program consists of a sequence of supersteps. So it just gets called once, not all over again unlike Mapper or Reducer method.

NOTE: Optionally, there are also setup() and cleanup() which will be called at the beginning of your computation, respectively at the end of the computation. cleanup() is guranteed to run after the computation or in case of failure. You can simply override the methods you need from BSP class.

After your own BSP is created, you will need to configure a BSPJob and submit it to Hama cluster to execute a job. The BSP job configuration and submission interfaces is almost the same as the MapReduce job configuration:

  HamaConfiguration conf = new HamaConfiguration();
  BSPJob job = new BSPJob(conf, MyBSP.class);
  job.setJobName("My BSP program");
  job.setBspClass(MyBSP.class);
  job.setInputFormat(NullInputFormat.class);
  job.setOutputKeyClass(Text.class);
  ...
  job.waitForCompletion(true);

See the below section for more detailed description of BSP user interfaces.

User Interfaces

Inputs and Outputs

When setting up a BSPJob, you can provide a Input/OutputFormat and Paths like this:

  job.setInputPath(new Path("/tmp/sequence.dat");
  job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
  or,
  SequenceFileInputFormat.addInputPath(job, new Path("/tmp/sequence.dat"));
  or,
  SequenceFileInputFormat.addInputPaths(job, "/tmp/seq1.dat,/tmp/seq2.dat,/tmp/seq3.dat");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.setOutputFormat(TextOutputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path("/tmp/result"));
  

Then, you can read the input and write the output from the methods in BSP class which has "BSPPeer" which contains an communication, counters, and IO interfaces as parameter. In this case we read a normal text file:

 @Override
  public final void bsp(
      BSPPeer<LongWritable, Text, Text, LongWritable, Text> peer)
      throws IOException, InterruptedException, SyncException {
      
      // this method reads the next key value record from file
      KeyValuePair<LongWritable, Text> pair = peer.readNext();

      // the following lines do the same:
      LongWritable key = new LongWritable();
      Text value = new Text();
      peer.readNext(key, value);
      
      // write
      peer.write(value, key);
  }

Consult the docs for more detail on events like end of file. There is also a function which allows you to re-read the input from the beginning. This snippet reads the input five times:

  for(int i = 0; i < 5; i++){
    LongWritable key = new LongWritable();
    Text value = new Text();
    while (peer.readNext(key, value)) {
       // read everything
    }
    // reopens the input
    peer.reopenInput()
  }

Communication

Hama BSP provides simple but powerful communication APIs for many purposes. We tried to follow the standard library of BSP world as much as possible. The following table describes all the methods you can use:

Method Description
send(String peerName, BSPMessage msg) Sends a message to another peer.
getCurrentMessage() Returns a received message.
getNumCurrentMessages() Returns the number of received messages.
sync() Barrier synchronization.
getPeerName() Returns a peer's hostname.
getAllPeerNames() Returns all peer's hostname.
getSuperstepCount() Returns the count of supersteps

The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:

  @Override
  public void bsp(
      BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, Text> peer)
      throws IOException, SyncException, InterruptedException {
    for (String peerName : peer.getAllPeerNames()) {
      peer.send(peerName, 
        new Text("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
    }

    peer.sync();
  }

Synchronization

When all the processes have entered the barrier via the sync() method, the Hama proceeds to the next superstep. In the previous example, the BSP job will be finished by one synchronization after sending a message "Hello from ..." to all peers.

But, keep in mind that the sync() function is not the end of the BSP job. As was previously mentioned, all the communication functions are very flexible. For example, the sync() method also can be called in a for loop so that you can use to program the iterative methods sequentially:

  @Override
  public void bsp(
      BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, Text> peer)
      throws IOException, SyncException, InterruptedException {
    for (int i = 0; i < 100; i++) {
      // send some messages
      peer.sync();
    }
  }

Combiners

Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors:

    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) throws IOException,
        SyncException, InterruptedException {

      for (int i = 0; i < 1000; i++) {
        peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
      }
      peer.sync();

      if (peer.getPeerName().equals(masterTask)) {
        IntegerMessage received;
        while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
          sum += received.getData();
        }
      }
    }

If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.

  public static class SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
      BSPMessageBundle bundle = new BSPMessageBundle();
      int sum = 0;

      Iterator<BSPMessage> it = messages.iterator();
      while (it.hasNext()) {
        sum += ((IntegerMessage) it.next()).getData();
      }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

Counters

Counters are used for measuring the progress or counting the number of events within job. For your own Counter, you need to define the enum type as follow:

  private static enum CUSTOM_COUNTER {
    CUSTOM_SEND_MESSAGE_COUNTER,
    ANOTHER_COUNTER
  }

Then you can increment your own counter by calling increment method as follow:

  peer.getCounter(CUSTOM_COUNTER.CUSTOM_SEND_MESSAGE_COUNTER).increment(1);

Shell Command Line Interfaces

Hama provides several command for BSP job administration:

Command Description
-submit <job-file> Submits the job.
-status <job-id> Prints the job status.
-kill <job-id> Kills the job.
-list [all] -list all displays all jobs. -list displays only jobs which are yet to be completed.
-list-active-grooms Displays the list of active groom server in the cluster.
-list-attempt-ids <jobId> <task-state> Displays the list of tasks for a given job currently in a particular state (running or completed).
-kill-task <task-id> Kills the task. Killed tasks are NOT counted against failed attempts.
-fail-task <task-id> Fails the task. Failed tasks are counted against failed attempts.

Example: Pi Calculation

Here is an BSP-based Pi Calculation example and submit it to Hama cluster:

  private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis());

  public static class MyEstimator extends
      BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
    public static final Log LOG = LogFactory.getLog(MyEstimator.class);
    private String masterTask;
    private static final int iterations = 10000;

    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {

      int in = 0;
      for (int i = 0; i < iterations; i++) {
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
        if ((Math.sqrt(x * x + y * y) < 1.0)) {
          in++;
        }
      }

      double data = 4.0 * in / iterations;

      peer.send(masterTask, new DoubleWritable(data));
      peer.sync();
    }

    @Override
    public void setup(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException {
      // Choose one as a master
      this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
    }

    @Override
    public void cleanup(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException {
      if (peer.getPeerName().equals(masterTask)) {
        double pi = 0.0;
        int numPeers = peer.getNumCurrentMessages();
        DoubleWritable received;
        while ((received = peer.getCurrentMessage()) != null) {
          pi += received.get();
        }

        pi = pi / numPeers;
        peer.write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
      }
    }
  }

  static void printOutput(HamaConfiguration conf) throws IOException {
    FileSystem fs = FileSystem.get(conf);
    FileStatus[] files = fs.listStatus(TMP_OUTPUT);
    for (int i = 0; i < files.length; i++) {
      if (files[i].getLen() > 0) {
        FSDataInputStream in = fs.open(files[i].getPath());
        IOUtils.copyBytes(in, System.out, conf, false);
        in.close();
        break;
      }
    }

    fs.delete(TMP_OUTPUT, true);
  }

  public static void main(String[] args) throws InterruptedException,
      IOException, ClassNotFoundException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();

    BSPJob bsp = new BSPJob(conf, PiEstimator.class);
    // Set the job name
    bsp.setJobName("Pi Estimation Example");
    bsp.setBspClass(MyEstimator.class);
    bsp.setInputFormat(NullInputFormat.class);
    bsp.setOutputKeyClass(Text.class);
    bsp.setOutputValueClass(DoubleWritable.class);
    bsp.setOutputFormat(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);

    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(true);

    if (args.length > 0) {
      bsp.setNumBspTask(Integer.parseInt(args[0]));
    } else {
      // Set to maximum
      bsp.setNumBspTask(cluster.getMaxTasks());
    }

    long startTime = System.currentTimeMillis();
    if (bsp.waitForCompletion(true)) {
      printOutput(conf);
      System.out.println("Job Finished in "
          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }
  }