1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats.multi;
20
21 import org.apache.giraph.io.GiraphInputFormat;
22 import org.apache.hadoop.mapreduce.InputSplit;
23 import org.apache.hadoop.mapreduce.JobContext;
24
25 import com.google.common.collect.Lists;
26
27 import java.io.DataInput;
28 import java.io.DataOutput;
29 import java.io.IOException;
30 import java.util.List;
31
32
33
34
35
36 public class MultiInputUtils {
37
38 private MultiInputUtils() {
39 }
40
41
42
43
44
45
46
47
48
49 public static List<InputSplit> getSplits(JobContext context,
50 int minSplitCountHint,
51 List<? extends GiraphInputFormat> inputFormats) throws IOException,
52 InterruptedException {
53 List<InputSplit> splits = Lists.newArrayList();
54 for (int index = 0; index < inputFormats.size(); index++) {
55 List<InputSplit> inputFormatSplits =
56 inputFormats.get(index).getSplits(context, minSplitCountHint);
57 for (InputSplit split : inputFormatSplits) {
58 splits.add(new InputSplitWithInputFormatIndex(split, index));
59 }
60 }
61 return splits;
62 }
63
64
65
66
67
68
69
70
71
72 public static void writeInputSplit(InputSplit inputSplit,
73 DataOutput dataOutput,
74 List<? extends GiraphInputFormat> inputFormats) throws IOException {
75 if (inputSplit instanceof InputSplitWithInputFormatIndex) {
76 InputSplitWithInputFormatIndex split =
77 (InputSplitWithInputFormatIndex) inputSplit;
78 int index = split.getInputFormatIndex();
79 dataOutput.writeInt(index);
80 inputFormats.get(index).writeInputSplit(split.getSplit(), dataOutput);
81 } else {
82 throw new IllegalStateException("writeInputSplit: Got InputSplit which " +
83 "was not created by multi input: " + inputSplit.getClass().getName());
84 }
85 }
86
87
88
89
90
91
92
93
94
95 public static InputSplit readInputSplit(
96 DataInput dataInput,
97 List<? extends GiraphInputFormat> inputFormats) throws IOException,
98 ClassNotFoundException {
99 int index = dataInput.readInt();
100 InputSplit split = inputFormats.get(index).readInputSplit(dataInput);
101 return new InputSplitWithInputFormatIndex(split, index);
102 }
103 }