/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using Avro.IO; using System.IO; namespace Avro.Generic { public delegate T Reader(); /// /// A general purpose reader of data from avro streams. This can optionally resolve if the reader's and writer's /// schemas are different. This class is a wrapper around DefaultReader and offers a little more type safety. The default reader /// has the flexibility to return any type of object for each read call because the Read() method is generic. This /// class on the other hand can only return a single type because the type is a parameter to the class. Any /// user defined extension should, however, be done to DefaultReader. This class is sealed. /// /// public sealed class GenericReader : DatumReader { private readonly DefaultReader reader; /// /// Constructs a generic reader for the given schemas using the DefaultReader. If the /// reader's and writer's schemas are different this class performs the resolution. /// /// The schema used while generating the data /// The schema desired by the reader public GenericReader(Schema writerSchema, Schema readerSchema) : this(new DefaultReader(writerSchema, readerSchema)) { } /// /// Constructs a generic reader by directly using the given DefaultReader /// /// The actual reader to use public GenericReader(DefaultReader reader) { this.reader = reader; } public Schema WriterSchema { get { return reader.WriterSchema; } } public Schema ReaderSchema { get { return reader.ReaderSchema; } } public T Read(T reuse, Decoder d) { return reader.Read(reuse, d); } } /// /// The default implementation for the generic reader. It constructs new .NET objects for avro objects on the /// stream and returns the .NET object. Users can directly use this class or, if they want to customize the /// object types for differnt Avro schema types, can derive from this class. There are enough hooks in this /// class to allow customization. /// /// /// /// Avro Type.NET Type /// nullnull reference /// /// public class DefaultReader { public Schema ReaderSchema { get; private set; } public Schema WriterSchema { get; private set; } /// /// Constructs the default reader for the given schemas using the DefaultReader. If the /// reader's and writer's schemas are different this class performs the resolution. /// This default implemenation maps Avro types to .NET types as follows: /// /// The schema used while generating the data /// The schema desired by the reader public DefaultReader(Schema writerSchema, Schema readerSchema) { this.ReaderSchema = readerSchema; this.WriterSchema = writerSchema; } /// /// Reads an object off the stream. /// /// The type of object to read. A single schema typically returns an object of a single .NET class. /// The only exception is UnionSchema, which can return a object of different types based on the branch selected. /// /// If not null, the implemenation will try to use to return the object /// The decoder for deserialization /// public T Read(T reuse, Decoder decoder) { if (!ReaderSchema.CanRead(WriterSchema)) throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema); return (T)Read(reuse, WriterSchema, ReaderSchema, decoder); } public object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d) { if (readerSchema.Tag == Schema.Type.Union && writerSchema.Tag != Schema.Type.Union) { readerSchema = findBranch(readerSchema as UnionSchema, writerSchema); } /* if (!readerSchema.CanRead(writerSchema)) { throw new AvroException("Schema mismatch. Reader: " + readerSchema + ", writer: " + writerSchema); } */ switch (writerSchema.Tag) { case Schema.Type.Null: return ReadNull(readerSchema, d); case Schema.Type.Boolean: return Read(writerSchema.Tag, readerSchema, d.ReadBoolean); case Schema.Type.Int: { int i = Read(writerSchema.Tag, readerSchema, d.ReadInt); switch (readerSchema.Tag) { case Schema.Type.Long: return (long)i; case Schema.Type.Float: return (float)i; case Schema.Type.Double: return (double)i; default: return i; } } case Schema.Type.Long: { long l = Read(writerSchema.Tag, readerSchema, d.ReadLong); switch (readerSchema.Tag) { case Schema.Type.Float: return (float)l; case Schema.Type.Double: return (double)l; default: return l; } } case Schema.Type.Float: { float f = Read(writerSchema.Tag, readerSchema, d.ReadFloat); switch (readerSchema.Tag) { case Schema.Type.Double: return (double)f; default: return f; } } case Schema.Type.Double: return Read(writerSchema.Tag, readerSchema, d.ReadDouble); case Schema.Type.String: return Read(writerSchema.Tag, readerSchema, d.ReadString); case Schema.Type.Bytes: return Read(writerSchema.Tag, readerSchema, d.ReadBytes); case Schema.Type.Error: case Schema.Type.Record: return ReadRecord(reuse, (RecordSchema)writerSchema, readerSchema, d); case Schema.Type.Enumeration: return ReadEnum(reuse, (EnumSchema)writerSchema, readerSchema, d); case Schema.Type.Fixed: return ReadFixed(reuse, (FixedSchema)writerSchema, readerSchema, d); case Schema.Type.Array: return ReadArray(reuse, (ArraySchema)writerSchema, readerSchema, d); case Schema.Type.Map: return ReadMap(reuse, (MapSchema)writerSchema, readerSchema, d); case Schema.Type.Union: return ReadUnion(reuse, (UnionSchema)writerSchema, readerSchema, d); default: throw new AvroException("Unknown schema type: " + writerSchema); } } /// /// Deserializes a null from the stream. /// /// Reader's schema, which should be a NullSchema /// The decoder for deserialization /// protected virtual object ReadNull(Schema readerSchema, Decoder d) { d.ReadNull(); return null; } /// /// A generic function to read primitive types /// /// The .NET type to read /// The Avro type tag for the object on the stream /// A schema compatible to the Avro type /// A function that can read the avro type from the stream /// The primitive type just read protected S Read(Schema.Type tag, Schema readerSchema, Reader reader) { return reader(); } /// /// Deserializes a record from the stream. /// /// If not null, a record object that could be reused for returning the result /// The writer's RecordSchema /// The reader's schema, must be RecordSchema too. /// The decoder for deserialization /// The record object just read protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec) { RecordSchema rs = (RecordSchema)readerSchema; object rec = CreateRecord(reuse, rs); foreach (Field wf in writerSchema) { try { Field rf; if (rs.TryGetFieldAlias(wf.Name, out rf)) { object obj = null; TryGetField(rec, wf.Name, rf.Pos, out obj); AddField(rec, wf.Name, rf.Pos, Read(obj, wf.Schema, rf.Schema, dec)); } else Skip(wf.Schema, dec); } catch (Exception ex) { throw new AvroException(ex.Message + " in field " + wf.Name); } } var defaultStream = new MemoryStream(); var defaultEncoder = new BinaryEncoder(defaultStream); var defaultDecoder = new BinaryDecoder(defaultStream); foreach (Field rf in rs) { if (writerSchema.Contains(rf.Name)) continue; defaultStream.Position = 0; // reset for writing Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue); defaultStream.Flush(); defaultStream.Position = 0; // reset for reading object obj = null; TryGetField(rec, rf.Name, rf.Pos, out obj); AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder)); } return rec; } /// /// Creates a new record object. Derived classes can override this to return an object of their choice. /// /// If appropriate, will reuse this object instead of constructing a new one /// The schema the reader is using /// protected virtual object CreateRecord(object reuse, RecordSchema readerSchema) { GenericRecord ru = (reuse == null || !(reuse is GenericRecord) || !(reuse as GenericRecord).Schema.Equals(readerSchema)) ? new GenericRecord(readerSchema) : reuse as GenericRecord; return ru; } /// /// Used by the default implementation of ReadRecord() to get the existing field of a record object. The derived /// classes can override this to make their own interpretation of the record object. /// /// The record object to be probed into. This is guaranteed to be one that was returned /// by a previous call to CreateRecord. /// The name of the field to probe. /// The value of the field, if found. Null otherwise. /// True if and only if a field with the given name is found. protected virtual bool TryGetField(object record, string fieldName, int fieldPos, out object value) { return (record as GenericRecord).TryGetValue(fieldName, out value); } /// /// Used by the default implementation of ReadRecord() to add a field to a record object. The derived /// classes can override this to suit their own implementation of the record object. /// /// The record object to be probed into. This is guaranteed to be one that was returned /// by a previous call to CreateRecord. /// The name of the field to probe. /// The value to be added for the field protected virtual void AddField(object record, string fieldName, int fieldPos, object fieldValue) { (record as GenericRecord).Add(fieldName, fieldValue); } /// /// Deserializes a enum. Uses CreateEnum to construct the new enum object. /// /// If appropirate, uses this instead of creating a new enum object. /// The schema the writer used while writing the enum /// The schema the reader is using /// The decoder for deserialization. /// An enum object. protected virtual object ReadEnum(object reuse, EnumSchema writerSchema, Schema readerSchema, Decoder d) { EnumSchema es = readerSchema as EnumSchema; return CreateEnum(reuse, readerSchema as EnumSchema, writerSchema[d.ReadEnum()]); } /// /// Used by the default implementation of ReadEnum to construct a new enum object. /// /// If appropriate, use this enum object instead of a new one. /// The enum schema used by the reader. /// The symbol that needs to be used. /// The default implemenation returns a GenericEnum. protected virtual object CreateEnum(object reuse, EnumSchema es, string symbol) { if (reuse is GenericEnum) { GenericEnum ge = reuse as GenericEnum; if (ge.Schema.Equals(es)) { ge.Value = symbol; return ge; } } return new GenericEnum(es, symbol); } /// /// Deserializes an array and returns an array object. It uses CreateArray() and works on it before returning it. /// It also uses GetArraySize(), ResizeArray(), SetArrayElement() and GetArrayElement() methods. Derived classes can /// override these methods to customize their behavior. /// /// If appropriate, uses this instead of creating a new array object. /// The schema used by the writer. /// The schema that the reader uses. /// The decoder for deserialization. /// The deserialized array object. protected virtual object ReadArray(object reuse, ArraySchema writerSchema, Schema readerSchema, Decoder d) { ArraySchema rs = (ArraySchema)readerSchema; object result = CreateArray(reuse, rs); int i = 0; for (int n = (int)d.ReadArrayStart(); n != 0; n = (int)d.ReadArrayNext()) { if (GetArraySize(result) < (i + n)) ResizeArray(ref result, i + n); for (int j = 0; j < n; j++, i++) { SetArrayElement(result, i, Read(GetArrayElement(result, i), writerSchema.ItemSchema, rs.ItemSchema, d)); } } if (GetArraySize(result) != i) ResizeArray(ref result, i); return result; } /// /// Creates a new array object. The initial size of the object could be anything. The users /// should use GetArraySize() to determine the size. The default implementation creates an object[]. /// /// If appropriate use this instead of creating a new one. /// An object suitable to deserialize an avro array protected virtual object CreateArray(object reuse, ArraySchema rs) { return (reuse != null && reuse is object[]) ? (object[])reuse : new object[0]; } /// /// Returns the size of the given array object. /// /// Array object whose size is required. This is guaranteed to be somthing returned by /// a previous call to CreateArray(). /// The size of the array protected virtual int GetArraySize(object array) { return (array as object[]).Length; } /// /// Resizes the array to the new value. /// /// Array object whose size is required. This is guaranteed to be somthing returned by /// a previous call to CreateArray(). /// The new size. protected virtual void ResizeArray(ref object array, int n) { object[] o = array as object[]; Array.Resize(ref o, n); array = o; } /// /// Assigns a new value to the object at the given index /// /// Array object whose size is required. This is guaranteed to be somthing returned by /// a previous call to CreateArray(). /// The index to reassign to. /// The value to assign. protected virtual void SetArrayElement(object array, int index, object value) { object[] a = array as object[]; a[index] = value; } /// /// Returns the element at the given index. /// /// Array object whose size is required. This is guaranteed to be somthing returned by /// a previous call to CreateArray(). /// The index to look into. /// The object the given index. Null if no object has been assigned to that index. protected virtual object GetArrayElement(object array, int index) { return (array as object[])[index]; } /// /// Deserialized an avro map. The default implemenation creats a new map using CreateMap() and then /// adds elements to the map using AddMapEntry(). /// /// If appropriate, use this instead of creating a new map object. /// The schema the writer used to write the map. /// The schema the reader is using. /// The decoder for serialization. /// The deserialized map object. protected virtual object ReadMap(object reuse, MapSchema writerSchema, Schema readerSchema, Decoder d) { MapSchema rs = (MapSchema)readerSchema; object result = CreateMap(reuse, rs); for (int n = (int)d.ReadMapStart(); n != 0; n = (int)d.ReadMapNext()) { for (int j = 0; j < n; j++) { string k = d.ReadString(); AddMapEntry(result, k, Read(null, writerSchema.ValueSchema, rs.ValueSchema, d)); } } return result; } /// /// Used by the default implementation of ReadMap() to create a fresh map object. The default /// implementaion of this method returns a IDictionary. /// /// If appropriate, use this map object instead of creating a new one. /// An empty map object. protected virtual object CreateMap(object reuse, MapSchema ms) { if (reuse != null && reuse is IDictionary) { IDictionary result = reuse as IDictionary; result.Clear(); return result; } return new Dictionary(); } /// /// Adds an entry to the map. /// /// A map object, which is guaranteed to be one returned by a previous call to CreateMap(). /// The key to add. /// The value to add. protected virtual void AddMapEntry(object map, string key, object value) { (map as IDictionary).Add(key, value); } /// /// Deserialized an object based on the writer's uninon schema. /// /// If appropriate, uses this object instead of creating a new one. /// The UnionSchema that the writer used. /// The schema the reader uses. /// The decoder for serialization. /// The deserialized object. protected virtual object ReadUnion(object reuse, UnionSchema writerSchema, Schema readerSchema, Decoder d) { int index = d.ReadUnionIndex(); Schema ws = writerSchema[index]; if (readerSchema is UnionSchema) readerSchema = findBranch(readerSchema as UnionSchema, ws); else if (!readerSchema.CanRead(ws)) throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema); return Read(reuse, ws, readerSchema, d); } /// /// Deserializes a fixed object and returns the object. The default implementation uses CreateFixed() /// and GetFixedBuffer() and returns what CreateFixed() returned. /// /// If appropriate, uses this object instead of creating a new one. /// The FixedSchema the writer used during serialization. /// The schema that the readr uses. Must be a FixedSchema with the same /// size as the writerSchema. /// The decoder for deserialization. /// The deserilized object. protected virtual object ReadFixed(object reuse, FixedSchema writerSchema, Schema readerSchema, Decoder d) { FixedSchema rs = (FixedSchema)readerSchema; if (rs.Size != writerSchema.Size) { throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema + ", reader: " + readerSchema); } object ru = CreateFixed(reuse, rs); byte[] bb = GetFixedBuffer(ru); d.ReadFixed(bb); return ru; } /// /// Returns a fixed object. /// /// If appropriate, uses this object instead of creating a new one. /// The reader's FixedSchema. /// A fixed object with an appropriate buffer. protected virtual object CreateFixed(object reuse, FixedSchema rs) { return (reuse != null && reuse is GenericFixed && (reuse as GenericFixed).Schema.Equals(rs)) ? (GenericFixed)reuse : new GenericFixed(rs); } /// /// Returns a buffer of appropriate size to read data into. /// /// The fixed object. It is guaranteed that this is something that has been previously /// returned by CreateFixed /// A byte buffer of fixed's size. protected virtual byte[] GetFixedBuffer(object f) { return (f as GenericFixed).Value; } protected virtual void Skip(Schema writerSchema, Decoder d) { switch (writerSchema.Tag) { case Schema.Type.Null: d.SkipNull(); break; case Schema.Type.Boolean: d.SkipBoolean(); break; case Schema.Type.Int: d.SkipInt(); break; case Schema.Type.Long: d.SkipLong(); break; case Schema.Type.Float: d.SkipFloat(); break; case Schema.Type.Double: d.SkipDouble(); break; case Schema.Type.String: d.SkipString(); break; case Schema.Type.Bytes: d.SkipBytes(); break; case Schema.Type.Record: foreach (Field f in writerSchema as RecordSchema) Skip(f.Schema, d); break; case Schema.Type.Enumeration: d.SkipEnum(); break; case Schema.Type.Fixed: d.SkipFixed((writerSchema as FixedSchema).Size); break; case Schema.Type.Array: { Schema s = (writerSchema as ArraySchema).ItemSchema; for (long n = d.ReadArrayStart(); n != 0; n = d.ReadArrayNext()) { for (long i = 0; i < n; i++) Skip(s, d); } } break; case Schema.Type.Map: { Schema s = (writerSchema as MapSchema).ValueSchema; for (long n = d.ReadMapStart(); n != 0; n = d.ReadMapNext()) { for (long i = 0; i < n; i++) { d.SkipString(); Skip(s, d); } } } break; case Schema.Type.Union: Skip((writerSchema as UnionSchema)[d.ReadUnionIndex()], d); break; default: throw new AvroException("Unknown schema type: " + writerSchema); } } protected static Schema findBranch(UnionSchema us, Schema s) { int index = us.MatchingBranch(s); if (index >= 0) return us[index]; throw new AvroException("No matching schema for " + s + " in " + us); } } }