1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.aggregators;
20
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24
25 import org.apache.giraph.BspCase;
26 import org.apache.giraph.comm.aggregators.AggregatorUtils;
27 import org.apache.giraph.conf.GiraphConfiguration;
28 import org.apache.giraph.conf.GiraphConstants;
29 import org.apache.giraph.examples.AggregatorsTestComputation;
30 import org.apache.giraph.examples.SimpleCheckpoint;
31 import org.apache.giraph.job.GiraphJob;
32 import org.apache.hadoop.fs.Path;
33 import org.junit.Test;
34
35
36 public class TestAggregatorsHandling extends BspCase {
37
38 public TestAggregatorsHandling() {
39 super(TestAggregatorsHandling.class.getName());
40 }
41
42
43 @Test
44 public void testAggregatorsHandling() throws IOException,
45 ClassNotFoundException, InterruptedException {
46 GiraphConfiguration conf = new GiraphConfiguration();
47 conf.setComputationClass(AggregatorsTestComputation.class);
48 conf.setVertexInputFormatClass(
49 AggregatorsTestComputation.SimpleVertexInputFormat.class);
50 conf.setEdgeInputFormatClass(
51 AggregatorsTestComputation.SimpleEdgeInputFormat.class);
52 GiraphJob job = prepareJob(getCallingMethodName(), conf);
53 job.getConfiguration().setMasterComputeClass(
54 AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
55
56 job.getConfiguration().setInt(
57 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
58 assertTrue(job.run(true));
59 }
60
61
62
63
64
65 @Test
66 public void testAggregatorsCheckpointing() throws ClassNotFoundException,
67 IOException, InterruptedException {
68 Path checkpointsDir = getTempPath("checkPointsForTesting");
69 Path outputPath = getTempPath(getCallingMethodName());
70 GiraphConfiguration conf = new GiraphConfiguration();
71 conf.setComputationClass(AggregatorsTestComputation.class);
72 conf.setMasterComputeClass(
73 AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
74 conf.setVertexInputFormatClass(
75 AggregatorsTestComputation.SimpleVertexInputFormat.class);
76 conf.setEdgeInputFormatClass(
77 AggregatorsTestComputation.SimpleEdgeInputFormat.class);
78 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
79
80 GiraphConfiguration configuration = job.getConfiguration();
81 GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
82 GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
83 configuration.setCheckpointFrequency(4);
84
85 assertTrue(job.run(true));
86
87
88 System.out.println("testAggregatorsCheckpointing: Restarting from " +
89 "superstep 4 with checkpoint path = " + checkpointsDir);
90 outputPath = getTempPath(getCallingMethodName() + "Restarted");
91 conf = new GiraphConfiguration();
92 conf.setComputationClass(AggregatorsTestComputation.class);
93 conf.setMasterComputeClass(
94 AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
95 conf.setVertexInputFormatClass(
96 AggregatorsTestComputation.SimpleVertexInputFormat.class);
97 conf.setEdgeInputFormatClass(
98 AggregatorsTestComputation.SimpleEdgeInputFormat.class);
99 GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
100 conf, outputPath);
101 job.getConfiguration().setMasterComputeClass(
102 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
103 GiraphConfiguration restartedJobConf = restartedJob.getConfiguration();
104 GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJobConf,
105 checkpointsDir.toString());
106 restartedJobConf.setLong(GiraphConstants.RESTART_SUPERSTEP, 4);
107
108 assertTrue(restartedJob.run(true));
109 }
110 }