1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.yarn;
20
21 import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.Properties;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import junit.framework.Assert;
29 import org.apache.commons.io.FileUtils;
30 import org.apache.giraph.conf.GiraphConfiguration;
31 import org.apache.giraph.conf.GiraphConstants;
32 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33 import org.apache.giraph.graph.BasicComputation;
34 import org.apache.giraph.graph.Vertex;
35 import org.apache.giraph.io.formats.GiraphFileInputFormat;
36 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
37 import org.apache.giraph.io.formats.IntIntNullTextInputFormat;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.io.IntWritable;
40 import org.apache.hadoop.io.NullWritable;
41 import org.apache.hadoop.yarn.conf.YarnConfiguration;
42 import org.apache.hadoop.yarn.server.MiniYARNCluster;
43 import org.apache.log4j.Logger;
44 import org.apache.zookeeper.WatchedEvent;
45 import org.apache.zookeeper.Watcher;
46 import org.apache.zookeeper.server.ServerConfig;
47 import org.apache.zookeeper.server.ZooKeeperServerMain;
48 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
49
50 import org.junit.Test;
51
52
53
54
55
56
57
58
59
60 public class TestYarnJob implements Watcher {
61 private static final Logger LOG = Logger.getLogger(TestYarnJob.class);
62
63
64
65 private static class DummyYarnComputation extends BasicComputation<
66 IntWritable, IntWritable, NullWritable, IntWritable> {
67 @Override
68 public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
69 Iterable<IntWritable> messages) throws IOException {
70 vertex.voteToHalt();
71 }
72 }
73
74
75 private static final String JOB_NAME = "giraph-TestPureYarnJob";
76
77 private static final int LOCAL_ZOOKEEPER_PORT = 22183;
78
79 private static final String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT;
80
81 private static final String zkDirName = "_bspZooKeeperYarn";
82
83 private static final String zkMgrDirName = "_defaultZooKeeperManagerYarn";
84
85
86 private File testBaseDir = null;
87
88 private File inputDir = null;
89
90 private File outputDir = null;
91
92 private File zkDir = null;
93
94 private File zkMgrDir = null;
95
96 private InternalZooKeeper zookeeper;
97
98 private ExecutorService exec = Executors.newSingleThreadExecutor();
99
100 private GiraphConfiguration conf = null;
101
102 private int zkEventCount = 0;
103
104 private MiniYARNCluster cluster = null;
105
106 @Test
107 public void testPureYarnJob() {
108 try {
109 setupYarnConfiguration();
110 initLocalZookeeper();
111 initYarnCluster();
112 GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME);
113 Assert.assertTrue(testGyc.run(true));
114 } catch (Exception e) {
115 e.printStackTrace();
116 Assert.fail("Caught exception in TestYarnJob: " + e);
117 } finally {
118 zookeeper.end();
119 exec.shutdown();
120 cluster.stop();
121 deleteTempDirectories();
122 }
123 }
124
125
126
127
128
129 @Override
130 public void process(WatchedEvent zkEvent) {
131 String event = zkEvent == null ? "NULL" : zkEvent.toString();
132 LOG.info("TestYarnJob observed ZK event: " + event +
133 " for a total of " + (++zkEventCount) + " so far.");
134 }
135
136
137
138
139 private void deleteTempDirectories() {
140 try {
141 if (testBaseDir != null && testBaseDir.exists()) {
142 FileUtils.deleteDirectory(testBaseDir);
143 }
144 } catch (IOException ioe) {
145 LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir);
146 }
147 }
148
149
150
151
152 private void initLocalZookeeper() throws IOException {
153 zookeeper = new InternalZooKeeper();
154 exec.execute(new Runnable() {
155 @Override
156 public void run() {
157 try {
158
159 Properties zkProperties = generateLocalZkProperties();
160 QuorumPeerConfig qpConfig = new QuorumPeerConfig();
161 qpConfig.parseProperties(zkProperties);
162
163 final ServerConfig zkConfig = new ServerConfig();
164 zkConfig.readFrom(qpConfig);
165 zookeeper.runFromConfig(zkConfig);
166 } catch (QuorumPeerConfig.ConfigException qpcce) {
167 throw new RuntimeException("parse of generated ZK config file " +
168 "has failed.", qpcce);
169 } catch (IOException e) {
170 e.printStackTrace();
171 throw new RuntimeException("initLocalZookeeper in TestYarnJob: ", e);
172 }
173 }
174
175
176
177
178
179 Properties generateLocalZkProperties() {
180 Properties zkProperties = new Properties();
181 zkProperties.setProperty("tickTime", "2000");
182 zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
183 zkProperties.setProperty("clientPort",
184 String.valueOf(LOCAL_ZOOKEEPER_PORT));
185 zkProperties.setProperty("maxClientCnxns", "10000");
186 zkProperties.setProperty("minSessionTimeout", "10000");
187 zkProperties.setProperty("maxSessionTimeout", "100000");
188 zkProperties.setProperty("initLimit", "10");
189 zkProperties.setProperty("syncLimit", "5");
190 zkProperties.setProperty("snapCount", "50000");
191 return zkProperties;
192 }
193 });
194 }
195
196
197
198
199
200
201 private void setupYarnConfiguration() throws IOException {
202 conf = new GiraphConfiguration();
203 conf.setWorkerConfiguration(1, 1, 100.0f);
204 conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
205 conf.setEventWaitMsecs(3 * 1000);
206 conf.setYarnLibJars("");
207 conf.setYarnTaskHeapMb(256);
208 conf.setComputationClass(DummyYarnComputation.class);
209 conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
210 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
211 conf.setNumComputeThreads(1);
212 conf.setMaxTaskAttempts(1);
213 conf.setNumInputSplitsThreads(1);
214
215 conf.setLocalTestMode(false);
216
217 setupTempDirectories();
218 conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString());
219 GiraphFileInputFormat.addVertexInputPath(conf, new Path(inputDir.getAbsolutePath()));
220
221 GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
222 conf.setZooKeeperConfiguration(zkList);
223 conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath());
224 GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, zkMgrDir.getAbsolutePath());
225
226 conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
227 }
228
229
230
231
232 private void setupTempDirectories() throws IOException {
233 try {
234 testBaseDir =
235 new File(System.getProperty("user.dir"), JOB_NAME);
236 if (testBaseDir.exists()) {
237 testBaseDir.delete();
238 }
239 testBaseDir.mkdir();
240 inputDir = new File(testBaseDir, "yarninput");
241 if (inputDir.exists()) {
242 inputDir.delete();
243 }
244 inputDir.mkdir();
245 File inFile = new File(inputDir, "graph_data.txt");
246 inFile.createNewFile();
247 outputDir = new File(testBaseDir, "yarnoutput");
248 if (outputDir.exists()) {
249 outputDir.delete();
250 }
251 zkDir = new File(testBaseDir, zkDirName);
252 if (zkDir.exists()) {
253 zkDir.delete();
254 }
255 zkDir.mkdir();
256 zkMgrDir = new File(testBaseDir, zkMgrDirName);
257 if (zkMgrDir.exists()) {
258 zkMgrDir.delete();
259 }
260 zkMgrDir.mkdir();
261 } catch (IOException ioe) {
262 ioe.printStackTrace();
263 throw new IOException("from setupTempDirectories: ", ioe);
264 }
265 }
266
267
268
269
270 private void initYarnCluster() {
271 cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1);
272 cluster.init(new ImmutableClassesGiraphConfiguration(conf));
273 cluster.start();
274 }
275
276
277
278
279 class InternalZooKeeper extends ZooKeeperServerMain {
280
281
282
283 void end() {
284 shutdown();
285 }
286 }
287 }