#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Command line utlity for reading and writing Avro files.""" from avro.io import DatumReader, DatumWriter from avro.datafile import DataFileReader, DataFileWriter import avro.schema try: import json except ImportError: import simplejson as json import csv from sys import stdout, stdin from itertools import ifilter, imap from functools import partial from os.path import splitext class AvroError(Exception): pass def print_json(row): print(json.dumps(row)) def print_json_pretty(row): print(json.dumps(row, indent=4)) _write_row = csv.writer(stdout).writerow _encoding = stdout.encoding or "UTF-8" def _encode(v, encoding=_encoding): if not isinstance(v, basestring): return v return v.encode(_encoding) def print_csv(row): # We sort the keys to the fields will be in the same place # FIXME: Do we want to do it in schema order? _write_row([_encode(row[key]) for key in sorted(row)]) def select_printer(format): return { "json" : print_json, "json-pretty" : print_json_pretty, "csv" : print_csv }[format] def record_match(expr, record): return eval(expr, None, {"r" : record}) def parse_fields(fields): fields = fields or '' if not fields.strip(): return None return [field.strip() for field in fields.split(',') if field.strip()] def field_selector(fields): fields = set(fields) def keys_filter(obj): return dict((k, obj[k]) for k in (set(obj) & fields)) return keys_filter def print_avro(avro, opts): if opts.header and (opts.format != "csv"): raise AvroError("--header applies only to CSV format") # Apply filter first if opts.filter: avro = ifilter(partial(record_match, opts.filter), avro) for i in xrange(opts.skip): try: next(avro) except StopIteration: return fields = parse_fields(opts.fields) if fields: avro = imap(field_selector(fields), avro) printer = select_printer(opts.format) for i, record in enumerate(avro): if i == 0 and opts.header: _write_row(sorted(record.keys())) if i >= opts.count: break printer(record) def print_schema(avro): schema = avro.meta["avro.schema"] # Pretty print print json.dumps(json.loads(schema), indent=4) def cat(opts, args): if not args: raise AvroError("No files to show") for filename in args: try: fo = open(filename, "rb") except (OSError, IOError), e: raise AvroError("Can't open %s - %s" % (filename, e)) avro = DataFileReader(fo, DatumReader()) if opts.print_schema: print_schema(avro) continue print_avro(avro, opts) def _open(filename, mode): if filename == "-": return { "rb" : stdin, "wb" : stdout }[mode] return open(filename, mode) def iter_json(info, _): return imap(json.loads, info) def convert(value, field): type = field.type.type if type == "union": return convert_union(value, field) return { "int" : int, "long" : long, "float" : float, "double" : float, "string" : str, "bytes" : str, "boolean" : bool, "null" : lambda _: None, "union" : lambda v: convert_union(v, field), }[type](value) def convert_union(value, field): for name in [s.name for s in field.type.schemas]: try: return convert(name)(value) except ValueError: continue def iter_csv(info, schema): header = [field.name for field in schema.fields] for row in csv.reader(info): values = [convert(v, f) for v, f in zip(row, schema.fields)] yield dict(zip(header, values)) def guess_input_type(files): if not files: return None ext = splitext(files[0])[1].lower() if ext in (".json", ".js"): return "json" elif ext in (".csv",): return "csv" return None def write(opts, files): if not opts.schema: raise AvroError("No schema specified") input_type = opts.input_type or guess_input_type(files) if not input_type: raise AvroError("Can't guess input file type (not .json or .csv)") try: schema = avro.schema.parse(open(opts.schema, "rb").read()) out = _open(opts.output, "wb") except (IOError, OSError), e: raise AvroError("Can't open file - %s" % e) writer = DataFileWriter(out, DatumWriter(), schema) iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type] for filename in (files or ["-"]): info = _open(filename, "rb") for record in iter_records(info, schema): writer.append(record) writer.close() def main(argv=None): import sys from optparse import OptionParser, OptionGroup argv = argv or sys.argv parser = OptionParser(description="Display/write for Avro files", version="@AVRO_VERSION@", usage="usage: %prog cat|write [options] FILE [FILE...]") # cat options cat_options = OptionGroup(parser, "cat options") cat_options.add_option("-n", "--count", default=float("Infinity"), help="number of records to print", type=int) cat_options.add_option("-s", "--skip", help="number of records to skip", type=int, default=0) cat_options.add_option("-f", "--format", help="record format", default="json", choices=["json", "csv", "json-pretty"]) cat_options.add_option("--header", help="print CSV header", default=False, action="store_true") cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)", default=None) cat_options.add_option("--print-schema", help="print schema", action="store_true", default=False) cat_options.add_option('--fields', default=None, help='fields to show, comma separated (show all by default)') parser.add_option_group(cat_options) # write options write_options = OptionGroup(parser, "write options") write_options.add_option("--schema", help="schema file (required)") write_options.add_option("--input-type", help="input file(s) type (json or csv)", choices=["json", "csv"], default=None) write_options.add_option("-o", "--output", help="output file", default="-") parser.add_option_group(write_options) opts, args = parser.parse_args(argv[1:]) if len(args) < 1: parser.error("You much specify `cat` or `write`") # Will exit command = args.pop(0) try: if command == "cat": cat(opts, args) elif command == "write": write(opts, args) else: raise AvroError("Unknown command - %s" % command) except AvroError, e: parser.error("%s" % e) # Will exit except Exception, e: raise SystemExit("panic: %s" % e) if __name__ == "__main__": main()