/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.piggybank.storage.avro;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
/**
* AvroStorage is used to load/store Avro data
* Document can be found here
*/
public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata, StoreResources {
private static final Log LOG = LogFactory.getLog(AvroStorage.class);
/* storeFunc parameters */
private static final String NOTNULL = "NOTNULL";
private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
private static final String AVRO_INPUT_SCHEMA_PROPERTY = "avro_input_schema";
private static final String AVRO_INPUT_PIG_SCHEMA_PROPERTY = "avro_input_pig_schema";
private static final String AVRO_MERGED_SCHEMA_PROPERTY = "avro_merged_schema_map";
private static final String SCHEMA_DELIM = "#";
private static final String SCHEMA_KEYVALUE_DELIM = "@";
private static final String NO_SCHEMA_CHECK = "no_schema_check";
private static final String IGNORE_BAD_FILES = "ignore_bad_files";
private static final String MULTIPLE_SCHEMAS = "multiple_schemas";
/* FIXME: we use this variable to distinguish schemas specified
* by different AvroStorage calls and will remove this once
* StoreFunc provides access to Pig output schema in backend.
* Default value is 0.
*/
private int storeFuncIndex = 0;
private PigAvroRecordWriter writer = null; /* avro record writer */
private Schema outputAvroSchema = null; /* output avro schema */
/* indicate whether data is nullable */
private boolean nullable = true;
/* loadFunc parameters */
private PigAvroRecordReader reader = null; /* avro record writer */
private Schema inputAvroSchema = null; /* input avro schema */
private Schema userSpecifiedAvroSchema = null; /* avro schema specified in constructor args */
/* if multiple avro record schemas are merged, this map associates each input
* record with a remapping of its fields relative to the merged schema. please
* see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
*/
private Map> schemaToMergedSchemaMap = null;
/* whether input avro files have the same schema or not. If input avro files
* do not have the same schema, we merge these schemas into a single schema
* and derive the pig schema from it.
*/
private boolean useMultipleSchemas = false;
private boolean checkSchema = true; /*whether check schema of input directories*/
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
private String contextSignature = null;
/**
* Empty constructor. Output schema is derived from pig schema.
*/
public AvroStorage() {
outputAvroSchema = null;
nullable = true;
AvroStorageLog.setDebugLevel(0);
checkSchema = true;
}
/**
* Constructor of quoted string list
*
* @param parts quoted string list
* @throws IOException
* @throws ParseException
*/
public AvroStorage(String[] parts) throws IOException, ParseException {
outputAvroSchema = null;
nullable = true;
checkSchema = true;
if (parts.length == 1
&& !parts[0].equalsIgnoreCase(NO_SCHEMA_CHECK)
&& !parts[0].equalsIgnoreCase(IGNORE_BAD_FILES)
&& !parts[0].equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
/* If one parameter is given, and that is not 'no_schema_check',
* 'ignore_bad_files', or 'multiple_schemas', then it must be a
* json string.
*/
init(parseJsonString(parts[0]));
} else {
/* parse parameters */
init(parseStringList(parts));
}
}
/**
* Set input location and obtain input schema.
*/
@SuppressWarnings("unchecked")
@Override
public void setLocation(String location, Job job) throws IOException {
if (inputAvroSchema != null) {
return;
}
if (!UDFContext.getUDFContext().isFrontend()) {
Properties udfProps = getUDFProperties();
String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
if (mergedSchema != null) {
HashMap> mergedSchemaMap =
(HashMap>) ObjectSerializer.deserialize(mergedSchema);
schemaToMergedSchemaMap = new HashMap>();
for (Entry> entry : mergedSchemaMap.entrySet()) {
schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
}
}
String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
if (schema != null) {
try {
inputAvroSchema = new Schema.Parser().parse(schema);
return;
} catch (Exception e) {
// Cases like testMultipleSchemas2 cause exception while deserializing
// symbols. In that case, we get it again.
LOG.warn("Exception while trying to deserialize schema in backend. " +
"Will construct again. schema= " + schema, e);
}
}
}
if (inputAvroSchema == null || UDFContext.getUDFContext().isFrontend()) {
Configuration conf = job.getConfiguration();
Set paths = getGlobPaths(location, conf, true);
if (!paths.isEmpty()) {
// Set top level directories in input format. Adding all files will
// bloat configuration size
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
// Scan all directories including sub directories for schema
if (inputAvroSchema == null) {
setInputAvroSchema(paths, conf);
}
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
}
}
@Override
public void setUDFContextSignature(String signature) {
this.contextSignature = signature;
super.setUDFContextSignature(signature);
}
/**
* Set input avro schema. If the 'multiple_schemas' option is enabled, we merge multiple
* schemas and use that merged schema; otherwise, we simply use the schema from the first
* file in the paths set.
*
* @param paths set of input files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
protected void setInputAvroSchema(Set paths, Configuration conf) throws IOException {
if(userSpecifiedAvroSchema != null) {
inputAvroSchema = userSpecifiedAvroSchema;
}
else {
inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
: getAvroSchema(paths, conf);
}
}
/**
* Get avro schema of first input file that matches the location pattern.
*
* @param paths set of input files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
protected Schema getAvroSchema(Set paths, Configuration conf) throws IOException {
if (paths == null || paths.isEmpty()) {
return null;
}
Iterator iterator = paths.iterator();
Schema schema = null;
while (iterator.hasNext()) {
Path path = iterator.next();
FileSystem fs = FileSystem.get(path.toUri(), conf);
schema = getAvroSchema(path, fs);
if (schema != null) {
break;
}
}
return schema;
}
/**
* Get avro schema of input path. There are three cases:
* 1. if path is a file, then return its avro schema;
* 2. if path is a first-level directory (no sub-directories), then
* return the avro schema of one underlying file;
* 3. if path contains sub-directories, then recursively check
* whether all of them share the same schema and return it
* if so or throw an exception if not.
*
* @param path input path
* @param fs file system
* @return avro schema of data
* @throws IOException if underlying sub-directories do not share the same schema; or if input path is empty or does not exist
*/
@SuppressWarnings("deprecation")
protected Schema getAvroSchema(Path path, FileSystem fs) throws IOException {
if (!fs.exists(path) || !Utils.VISIBLE_FILES.accept(path))
return null;
/* if path is first level directory or is a file */
if (!fs.isDirectory(path)) {
return getSchema(path, fs);
}
FileStatus[] ss = fs.listStatus(path, Utils.VISIBLE_FILES);
Schema schema = null;
if (ss.length > 0) {
if (AvroStorageUtils.noDir(ss))
return getSchema(path, fs);
/*otherwise, check whether schemas of underlying directories are the same */
for (FileStatus s : ss) {
Schema newSchema = getAvroSchema(s.getPath(), fs);
if (schema == null) {
schema = newSchema;
if(!checkSchema) {
System.out.println("Do not check schema; use schema of " + s.getPath());
return schema;
}
} else if (newSchema != null && !schema.equals(newSchema)) {
throw new IOException( "Input path is " + path + ". Sub-direcotry " + s.getPath()
+ " contains different schema " + newSchema + " than " + schema);
}
}
}
if (schema == null)
System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
return schema;
}
/**
* Merge multiple input avro schemas into one. Note that we can't merge arbitrary schemas.
* Please see AvroStorageUtils.mergeSchema() for what's allowed and what's not allowed.
*
* @param basePaths set of input dir or files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
protected Schema getMergedSchema(Set basePaths, Configuration conf) throws IOException {
Schema result = null;
Map mergedFiles = new HashMap();
Set paths = AvroStorageUtils.getAllFilesRecursively(basePaths, conf);
for (Path path : paths) {
FileSystem fs = FileSystem.get(path.toUri(), conf);
Schema schema = getSchema(path, fs);
if (schema != null) {
result = AvroStorageUtils.mergeSchema(result, schema);
mergedFiles.put(path, schema);
}
}
// schemaToMergedSchemaMap is only needed when merging multiple records.
if ((schemaToMergedSchemaMap == null || schemaToMergedSchemaMap.isEmpty()) &&
mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
}
return result;
}
/**
* This method is called by {@link #getAvroSchema}. The default implementation
* returns the schema of an avro file; or the schema of the last file in a first-level
* directory (it does not contain sub-directories).
*
* @param path path of a file or first level directory
* @param fs file system
* @return avro schema
* @throws IOException
*/
protected Schema getSchema(Path path, FileSystem fs) throws IOException {
return AvroStorageUtils.getSchema(path, fs);
}
/**
* This method is called to return the schema of an avro schema file. This
* method is different than {@link #getSchema}, which returns the schema
* from a data file.
*
* @param path path of a file or first level directory
* @param fs file system
* @return avro schema
* @throws IOException
*/
protected Schema getSchemaFromFile(Path path, FileSystem fs) throws IOException {
/* get path of the last file */
Path lastFile = AvroStorageUtils.getLast(path, fs);
if (lastFile == null) {
return null;
}
/* read in file and obtain schema */
GenericDatumReader