/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.builtin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.OverwritableStoreFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.CastUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
/**
* A load function that parses a line of input into fields using a character delimiter.
* The default delimiter is a tab. You can specify any character as a literal ("a"),
* a known escape character ("\\t"), or a dec or hex value ("\\u001", "\\x0A").
*
* An optional second constructor argument is provided that allows one to customize
* advanced behaviors. A list of available options is below:
*
*
-schema Reads/Stores the schema of the relation using a
* hidden JSON file.
*
-noschema Ignores a stored schema during loading.
*
-tagFile Appends input source file name to beginning of each tuple.
*
-tagPath Appends input source file path to beginning of each tuple.
*
*
*
Schemas
* If -schema is specified, a hidden ".pig_schema" file is created in the output directory
* when storing data. It is used by PigStorage (with or without -schema) during loading to determine the
* field names and types of the data without the need for a user to explicitly provide the schema in an
* as clause, unless -noschema is specified. No attempt to merge conflicting
* schemas is made during loading. The first schema encountered during a file system scan is used.
* If the schema file is not present while '-schema' option is used during loading,
* it results in an error.
*
* In addition, using -schema drops a ".pig_headers" file in the output directory.
* This file simply lists the delimited aliases. This is intended to make export to tools that can read
* files with header lines easier (just cat the header to your data).
*
*
Source tagging
* If-tagFile is specified, PigStorage will prepend input split name to each Tuple/row.
* Usage: A = LOAD 'input' using PigStorage(',','-tagFile'); B = foreach A generate $0;
* The first field (0th index) in each Tuple will contain input file name.
* If-tagPath is specified, PigStorage will prepend input split path to each Tuple/row.
* Usage: A = LOAD 'input' using PigStorage(',','-tagPath'); B = foreach A generate $0;
* The first field (0th index) in each Tuple will contain input file path
*
* Note that regardless of whether or not you store the schema, you always need to specify
* the correct delimiter to read your data. If you store reading delimiter "#" and then load using
* the default delimiter, your data will not be parsed correctly.
*
*
Compression
* Storing to a directory whose name ends in ".bz2" or ".gz" or ".lzo" (if you have installed support
* for LZO compression in Hadoop) will automatically use the corresponding compression codec.
* output.compression.enabled and output.compression.codec job properties
* also work.
*
* Loading from directories ending in .bz2 or .bz works automatically; other compression formats are not
* auto-detected on loading.
*
*/
@SuppressWarnings("unchecked")
public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
LoadPushDown, LoadMetadata, StoreMetadata, OverwritableStoreFunc {
protected RecordReader in = null;
protected RecordWriter writer = null;
protected final Log mLog = LogFactory.getLog(getClass());
protected String signature;
private byte fieldDel = '\t';
private ArrayList