1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hcatalog.mapreduce;
20
21 import org.apache.giraph.io.hcatalog.GiraphHCatInputFormat;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.hive.conf.HiveConf;
24 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
25 import org.apache.hadoop.hive.metastore.api.MetaException;
26 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
27 import org.apache.hadoop.hive.metastore.api.Partition;
28 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
29 import org.apache.hadoop.hive.ql.metadata.HiveException;
30 import org.apache.hadoop.hive.ql.metadata.Table;
31 import org.apache.hadoop.mapreduce.InputSplit;
32 import org.apache.hadoop.mapreduce.RecordReader;
33 import org.apache.hcatalog.common.ErrorType;
34 import org.apache.hcatalog.common.HCatException;
35 import org.apache.hcatalog.common.HCatUtil;
36 import org.apache.hcatalog.data.schema.HCatSchema;
37 import org.apache.thrift.TException;
38
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.HashMap;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Properties;
45
46
47
48
49 public class HCatUtils {
50
51
52
53 private HCatUtils() { }
54
55
56
57
58
59
60
61
62
63
64 public static InputJobInfo getInputJobInfo(
65 Configuration conf, InputJobInfo inputJobInfo)
66 throws IOException {
67 HiveMetaStoreClient client = null;
68 HiveConf hiveConf;
69 try {
70 if (conf != null) {
71 hiveConf = HCatUtil.getHiveConf(conf);
72 } else {
73 hiveConf = new HiveConf(GiraphHCatInputFormat.class);
74 }
75 client = HCatUtil.getHiveClient(hiveConf);
76 Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
77 inputJobInfo.getTableName());
78
79 List<PartInfo> partInfoList = new ArrayList<PartInfo>();
80
81 inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
82 if (table.getPartitionKeys().size() != 0) {
83
84 List<Partition> parts = client.listPartitionsByFilter(
85 inputJobInfo.getDatabaseName(),
86 inputJobInfo.getTableName(),
87 inputJobInfo.getFilter(),
88 (short) -1);
89
90 if (parts != null) {
91
92
93 int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
94 if (parts.size() > maxPart) {
95 throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART,
96 "total number of partitions is " + parts.size());
97 }
98
99
100 for (Partition ptn : parts) {
101 HCatSchema schema = HCatUtil.extractSchema(
102 new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
103 PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
104 ptn.getParameters(), conf, inputJobInfo);
105 partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
106 ptn));
107 partInfoList.add(partInfo);
108 }
109 }
110 } else {
111
112 HCatSchema schema = HCatUtil.extractSchema(table);
113 PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
114 table.getParameters(), conf, inputJobInfo);
115 partInfo.setPartitionValues(new HashMap<String, String>());
116 partInfoList.add(partInfo);
117 }
118 inputJobInfo.setPartitions(partInfoList);
119 } catch (MetaException e) {
120 throw new IOException("Got MetaException", e);
121 } catch (NoSuchObjectException e) {
122 throw new IOException("Got NoSuchObjectException", e);
123 } catch (TException e) {
124 throw new IOException("Got TException", e);
125 } catch (HiveException e) {
126 throw new IOException("Got HiveException", e);
127 } finally {
128 HCatUtil.closeHiveClientQuietly(client);
129 }
130 return inputJobInfo;
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144 private static PartInfo extractPartInfo(
145 HCatSchema schema, StorageDescriptor sd, Map<String, String> parameters,
146 Configuration conf, InputJobInfo inputJobInfo) throws IOException {
147 StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
148
149 Properties hcatProperties = new Properties();
150 HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
151 storerInfo);
152
153
154 Map<String, String> jobProperties =
155 HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
156
157 for (Map.Entry<String, String> param : parameters.entrySet()) {
158 hcatProperties.put(param.getKey(), param.getValue());
159 }
160
161 return new PartInfo(schema, storageHandler, sd.getLocation(),
162 hcatProperties, jobProperties, inputJobInfo.getTableInfo());
163 }
164
165
166
167
168
169
170
171
172 public static RecordReader newHCatReader(
173 HCatStorageHandler storageHandler,
174 Map<String, String> valuesNotInDataCols) {
175 return new HCatRecordReader(storageHandler, valuesNotInDataCols);
176 }
177
178
179
180
181
182
183
184
185 public static HCatSplit castToHCatSplit(InputSplit split)
186 throws IOException {
187 return InternalUtil.castToHCatSplit(split);
188 }
189 }