View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hcatalog.mapreduce;
20  
21  import org.apache.giraph.io.hcatalog.GiraphHCatInputFormat;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.hive.conf.HiveConf;
24  import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
25  import org.apache.hadoop.hive.metastore.api.MetaException;
26  import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
27  import org.apache.hadoop.hive.metastore.api.Partition;
28  import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
29  import org.apache.hadoop.hive.ql.metadata.HiveException;
30  import org.apache.hadoop.hive.ql.metadata.Table;
31  import org.apache.hadoop.mapreduce.InputSplit;
32  import org.apache.hadoop.mapreduce.RecordReader;
33  import org.apache.hcatalog.common.ErrorType;
34  import org.apache.hcatalog.common.HCatException;
35  import org.apache.hcatalog.common.HCatUtil;
36  import org.apache.hcatalog.data.schema.HCatSchema;
37  import org.apache.thrift.TException;
38  
39  import java.io.IOException;
40  import java.util.ArrayList;
41  import java.util.HashMap;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Properties;
45  
46  /**
47   * Utility methods copied from HCatalog because of visibility restrictions.
48   */
49  public class HCatUtils {
50    /**
51     * Don't instantiate.
52     */
53    private HCatUtils() { }
54  
55    /**
56     * Returns the given InputJobInfo after populating with data queried from the
57     * metadata service.
58     *
59     * @param conf Configuration
60     * @param inputJobInfo Input job info
61     * @return Populated input job info
62     * @throws IOException
63     */
64    public static InputJobInfo getInputJobInfo(
65        Configuration conf, InputJobInfo inputJobInfo)
66      throws IOException {
67      HiveMetaStoreClient client = null;
68      HiveConf hiveConf;
69      try {
70        if (conf != null) {
71          hiveConf = HCatUtil.getHiveConf(conf);
72        } else {
73          hiveConf = new HiveConf(GiraphHCatInputFormat.class);
74        }
75        client = HCatUtil.getHiveClient(hiveConf);
76        Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
77            inputJobInfo.getTableName());
78  
79        List<PartInfo> partInfoList = new ArrayList<PartInfo>();
80  
81        inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
82        if (table.getPartitionKeys().size() != 0) {
83          // Partitioned table
84          List<Partition> parts = client.listPartitionsByFilter(
85              inputJobInfo.getDatabaseName(),
86              inputJobInfo.getTableName(),
87              inputJobInfo.getFilter(),
88              (short) -1);
89  
90          if (parts != null) {
91            // Default to 100,000 partitions if hive.metastore.maxpartition is not
92            // defined
93            int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
94            if (parts.size() > maxPart) {
95              throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART,
96                  "total number of partitions is " + parts.size());
97            }
98  
99            // Populate partition info
100           for (Partition ptn : parts) {
101             HCatSchema schema = HCatUtil.extractSchema(
102                 new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
103             PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
104                 ptn.getParameters(), conf, inputJobInfo);
105             partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
106                 ptn));
107             partInfoList.add(partInfo);
108           }
109         }
110       } else {
111         // Non partitioned table
112         HCatSchema schema = HCatUtil.extractSchema(table);
113         PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
114             table.getParameters(), conf, inputJobInfo);
115         partInfo.setPartitionValues(new HashMap<String, String>());
116         partInfoList.add(partInfo);
117       }
118       inputJobInfo.setPartitions(partInfoList);
119     } catch (MetaException e) {
120       throw new IOException("Got MetaException", e);
121     } catch (NoSuchObjectException e) {
122       throw new IOException("Got NoSuchObjectException", e);
123     } catch (TException e) {
124       throw new IOException("Got TException", e);
125     } catch (HiveException e) {
126       throw new IOException("Got HiveException", e);
127     } finally {
128       HCatUtil.closeHiveClientQuietly(client);
129     }
130     return inputJobInfo;
131   }
132 
133   /**
134    * Extract partition info.
135    *
136    * @param schema Table schema
137    * @param sd Storage descriptor
138    * @param parameters Parameters
139    * @param conf Configuration
140    * @param inputJobInfo Input job info
141    * @return Partition info
142    * @throws IOException
143    */
144   private static PartInfo extractPartInfo(
145       HCatSchema schema, StorageDescriptor sd, Map<String, String> parameters,
146       Configuration conf, InputJobInfo inputJobInfo) throws IOException {
147     StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
148 
149     Properties hcatProperties = new Properties();
150     HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
151         storerInfo);
152 
153     // Copy the properties from storageHandler to jobProperties
154     Map<String, String> jobProperties =
155         HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
156 
157     for (Map.Entry<String, String> param : parameters.entrySet()) {
158       hcatProperties.put(param.getKey(), param.getValue());
159     }
160 
161     return new PartInfo(schema, storageHandler, sd.getLocation(),
162         hcatProperties, jobProperties, inputJobInfo.getTableInfo());
163   }
164 
165   /**
166    * Create a new {@link HCatRecordReader}.
167    *
168    * @param storageHandler Storage handler
169    * @param valuesNotInDataCols Values not in data columns
170    * @return Record reader
171    */
172   public static RecordReader newHCatReader(
173       HCatStorageHandler storageHandler,
174       Map<String, String> valuesNotInDataCols) {
175     return new HCatRecordReader(storageHandler, valuesNotInDataCols);
176   }
177 
178   /**
179    * Cast an {@link InputSplit} to {@link HCatSplit}.
180    *
181    * @param split Input split
182    * @return {@link HCatSplit}
183    * @throws IOException
184    */
185   public static HCatSplit castToHCatSplit(InputSplit split)
186     throws IOException {
187     return InternalUtil.castToHCatSplit(split);
188   }
189 }