/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Collections.Generic; using System.IO; using Avro.IO; namespace Avro.Generic { /// /// A general purpose reader of data from avro streams. This reader analyzes and resolves the reader and writer schemas /// when constructed so that reads can be more efficient. Once constructed, a reader can be reused or shared among threads /// to avoid incurring more resolution costs. /// public abstract class PreresolvingDatumReader : DatumReader { public Schema ReaderSchema { get; private set; } public Schema WriterSchema { get; private set; } protected delegate object ReadItem(object reuse, Decoder dec); // read a specific field from a decoder private delegate object DecoderRead(Decoder dec); // skip specific field(s) from a decoder private delegate void DecoderSkip(Decoder dec); // read & set fields on a record private delegate void FieldReader(object record, Decoder decoder); private readonly ReadItem _reader; private readonly Dictionary _recordReaders = new Dictionary(); protected PreresolvingDatumReader(Schema writerSchema, Schema readerSchema) { ReaderSchema = readerSchema; WriterSchema = writerSchema; if (!ReaderSchema.CanRead(WriterSchema)) throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema); _reader = ResolveReader(writerSchema, readerSchema); } public T Read(T reuse, Decoder decoder) { return (T)_reader(reuse, decoder); } protected abstract ArrayAccess GetArrayAccess(ArraySchema readerSchema); protected abstract EnumAccess GetEnumAccess(EnumSchema readerSchema); protected abstract MapAccess GetMapAccess(MapSchema readerSchema); protected abstract RecordAccess GetRecordAccess(RecordSchema readerSchema); protected abstract FixedAccess GetFixedAccess(FixedSchema readerSchema); /// /// Build a reader that accounts for schema differences between the reader and writer schemas. /// private ReadItem ResolveReader(Schema writerSchema, Schema readerSchema) { if (readerSchema.Tag == Schema.Type.Union && writerSchema.Tag != Schema.Type.Union) { readerSchema = FindBranch(readerSchema as UnionSchema, writerSchema); } switch (writerSchema.Tag) { case Schema.Type.Null: return ReadNull; case Schema.Type.Boolean: return ReadBoolean; case Schema.Type.Int: { switch (readerSchema.Tag) { case Schema.Type.Long: return Read(d => (long) d.ReadInt()); case Schema.Type.Float: return Read(d => (float) d.ReadInt()); case Schema.Type.Double: return Read(d => (double) d.ReadInt()); default: return Read(d => d.ReadInt()); } } case Schema.Type.Long: { switch (readerSchema.Tag) { case Schema.Type.Float: return Read(d => (float) d.ReadLong()); case Schema.Type.Double: return Read(d => (double) d.ReadLong()); default: return Read(d => d.ReadLong()); } } case Schema.Type.Float: { switch (readerSchema.Tag) { case Schema.Type.Double: return Read(d => (double) d.ReadFloat()); default: return Read(d => d.ReadFloat()); } } case Schema.Type.Double: return Read(d => d.ReadDouble()); case Schema.Type.String: return Read(d => d.ReadString()); case Schema.Type.Bytes: return Read(d => d.ReadBytes()); case Schema.Type.Error: case Schema.Type.Record: return ResolveRecord((RecordSchema)writerSchema, (RecordSchema)readerSchema); case Schema.Type.Enumeration: return ResolveEnum((EnumSchema)writerSchema, (EnumSchema)readerSchema); case Schema.Type.Fixed: return ResolveFixed((FixedSchema)writerSchema, (FixedSchema)readerSchema); case Schema.Type.Array: return ResolveArray((ArraySchema)writerSchema, (ArraySchema)readerSchema); case Schema.Type.Map: return ResolveMap((MapSchema)writerSchema, (MapSchema)readerSchema); case Schema.Type.Union: return ResolveUnion((UnionSchema)writerSchema, readerSchema); default: throw new AvroException("Unknown schema type: " + writerSchema); } } private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema) { var enumAccess = GetEnumAccess(readerSchema); if (readerSchema.Equals(writerSchema)) { return (r, d) => enumAccess.CreateEnum(r, d.ReadEnum()); } var translator = new int[writerSchema.Symbols.Count]; foreach (var symbol in writerSchema.Symbols) { var writerOrdinal = writerSchema.Ordinal(symbol); if (readerSchema.Contains(symbol)) { translator[writerOrdinal] = readerSchema.Ordinal(symbol); } else { translator[writerOrdinal] = -1; } } return (r, d) => { var writerOrdinal = d.ReadEnum(); var readerOrdinal = translator[writerOrdinal]; if (readerOrdinal == -1) { throw new AvroException("No such symbol: " + writerSchema[writerOrdinal]); } return enumAccess.CreateEnum(r, readerOrdinal); }; } private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSchema) { var schemaPair = new SchemaPair(writerSchema, readerSchema); ReadItem recordReader; if (_recordReaders.TryGetValue(schemaPair, out recordReader)) { return recordReader; } FieldReader[] fieldReaderArray = null; var recordAccess = GetRecordAccess(readerSchema); recordReader = (r, d) => ReadRecord(r, d, recordAccess, fieldReaderArray); _recordReaders.Add(schemaPair, recordReader); var readSteps = new List(); foreach (Field wf in writerSchema) { Field rf; if (readerSchema.TryGetFieldAlias(wf.Name, out rf)) { var readItem = ResolveReader(wf.Schema, rf.Schema); if(IsReusable(rf.Schema.Tag)) { readSteps.Add((rec,d) => recordAccess.AddField(rec, rf.Name, rf.Pos, readItem(recordAccess.GetField(rec, rf.Name, rf.Pos), d))); } else { readSteps.Add((rec, d) => recordAccess.AddField(rec, rf.Name, rf.Pos, readItem(null, d))); } } else { var skip = GetSkip(wf.Schema); readSteps.Add((rec, d) => skip(d)); } } // fill in defaults for any reader fields not in the writer schema foreach (Field rf in readerSchema) { if (writerSchema.Contains(rf.Name)) continue; var defaultStream = new MemoryStream(); var defaultEncoder = new BinaryEncoder(defaultStream); defaultStream.Position = 0; // reset for writing Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue); defaultStream.Flush(); var defaultBytes = defaultStream.ToArray(); var readItem = ResolveReader(rf.Schema, rf.Schema); var rfInstance = rf; if(IsReusable(rf.Schema.Tag)) { readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos, readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos), new BinaryDecoder(new MemoryStream( defaultBytes))))); } else { readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos, readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes))))); } } fieldReaderArray = readSteps.ToArray(); return recordReader; } private object ReadRecord(object reuse, Decoder decoder, RecordAccess recordAccess, IEnumerable readSteps ) { var rec = recordAccess.CreateRecord(reuse); foreach (FieldReader fr in readSteps) { fr(rec, decoder); // TODO: on exception, report offending field } return rec; } private ReadItem ResolveUnion(UnionSchema writerSchema, Schema readerSchema) { var lookup = new ReadItem[writerSchema.Count]; for (int i = 0; i < writerSchema.Count; i++) { var writerBranch = writerSchema[i]; if (readerSchema is UnionSchema) { var unionReader = (UnionSchema) readerSchema; var readerBranch = unionReader.MatchingBranch(writerBranch); if (readerBranch == -1) { lookup[i] = (r, d) => { throw new AvroException( "No matching schema for " + writerBranch + " in " + unionReader ); }; } else { lookup[i] = ResolveReader(writerBranch, unionReader[readerBranch]); } } else { if (!readerSchema.CanRead(writerBranch)) { lookup[i] = (r, d) => { throw new AvroException( "Schema mismatch Reader: " + ReaderSchema + ", writer: " + WriterSchema ); }; } else { lookup[i] = ResolveReader(writerBranch, readerSchema); } } } return (r, d) => ReadUnion(r, d, lookup); } private object ReadUnion(object reuse, Decoder d, ReadItem[] branchLookup) { return branchLookup[d.ReadUnionIndex()](reuse, d); } private ReadItem ResolveMap(MapSchema writerSchema, MapSchema readerSchema) { var rs = readerSchema.ValueSchema; var ws = writerSchema.ValueSchema; var reader = ResolveReader(ws, rs); var mapAccess = GetMapAccess(readerSchema); return (r,d) => ReadMap(r, d, mapAccess, reader); } private object ReadMap(object reuse, Decoder decoder, MapAccess mapAccess, ReadItem valueReader) { object map = mapAccess.Create(reuse); for (int n = (int)decoder.ReadMapStart(); n != 0; n = (int)decoder.ReadMapNext()) { mapAccess.AddElements(map, n, valueReader, decoder, false); } return map; } private ReadItem ResolveArray(ArraySchema writerSchema, ArraySchema readerSchema) { var itemReader = ResolveReader(writerSchema.ItemSchema, readerSchema.ItemSchema); var arrayAccess = GetArrayAccess(readerSchema); return (r, d) => ReadArray(r, d, arrayAccess, itemReader, IsReusable(readerSchema.ItemSchema.Tag)); } private object ReadArray(object reuse, Decoder decoder, ArrayAccess arrayAccess, ReadItem itemReader, bool itemReusable) { object array = arrayAccess.Create(reuse); int i = 0; for (int n = (int)decoder.ReadArrayStart(); n != 0; n = (int)decoder.ReadArrayNext()) { arrayAccess.EnsureSize(ref array, i + n); arrayAccess.AddElements(array, n, i, itemReader, decoder, itemReusable); i += n; } arrayAccess.Resize(ref array, i); return array; } private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema) { if (readerSchema.Size != writerSchema.Size) { throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema + ", reader: " + readerSchema); } var fixedAccess = GetFixedAccess(readerSchema); return (r, d) => ReadFixed(r, d, fixedAccess); } private object ReadFixed(object reuse, Decoder decoder, FixedAccess fixedAccess) { var fixedrec = fixedAccess.CreateFixed(reuse); decoder.ReadFixed(fixedAccess.GetFixedBuffer(fixedrec)); return fixedrec; } protected static Schema FindBranch(UnionSchema us, Schema s) { int index = us.MatchingBranch(s); if (index >= 0) return us[index]; throw new AvroException("No matching schema for " + s + " in " + us); } private object ReadNull(object reuse, Decoder decoder) { decoder.ReadNull(); return null; } private object ReadBoolean(object reuse, Decoder decoder) { return decoder.ReadBoolean(); } private ReadItem Read(DecoderRead decoderRead) { return (r, d) => decoderRead(d); } private DecoderSkip GetSkip(Schema writerSchema) { switch (writerSchema.Tag) { case Schema.Type.Null: return d => d.SkipNull(); case Schema.Type.Boolean: return d => d.SkipBoolean(); case Schema.Type.Int: return d => d.SkipInt(); case Schema.Type.Long: return d => d.SkipLong(); case Schema.Type.Float: return d => d.SkipFloat(); case Schema.Type.Double: return d => d.SkipDouble(); case Schema.Type.String: return d => d.SkipString(); case Schema.Type.Bytes: return d => d.SkipBytes(); case Schema.Type.Error: case Schema.Type.Record: var recordSkips = new List(); var recSchema = (RecordSchema)writerSchema; recSchema.Fields.ForEach(r => recordSkips.Add(GetSkip(r.Schema))); return d => recordSkips.ForEach(s=>s(d)); case Schema.Type.Enumeration: return d => d.SkipEnum(); case Schema.Type.Fixed: var size = ((FixedSchema)writerSchema).Size; return d => d.SkipFixed(size); case Schema.Type.Array: var itemSkip = GetSkip(((ArraySchema)writerSchema).ItemSchema); return d => { for (long n = d.ReadArrayStart(); n != 0; n = d.ReadArrayNext()) { for (long i = 0; i < n; i++) itemSkip(d); } }; case Schema.Type.Map: { var valueSkip = GetSkip(((MapSchema)writerSchema).ValueSchema); return d => { for (long n = d.ReadMapStart(); n != 0; n = d.ReadMapNext()) { for (long i = 0; i < n; i++) { d.SkipString(); valueSkip(d); } } }; } case Schema.Type.Union: var unionSchema = (UnionSchema)writerSchema; var lookup = new DecoderSkip[unionSchema.Count]; for (int i = 0; i < unionSchema.Count; i++) { lookup[i] = GetSkip( unionSchema[i] ); } return d => lookup[d.ReadUnionIndex()](d); default: throw new AvroException("Unknown schema type: " + writerSchema); } } /// /// Indicates if it's possible to reuse an object of the specified type. Generally /// false for immutable objects like int, long, string, etc but may differ between /// the Specific and Generic implementations. Used to avoid retrieving the existing /// value if it's not reusable. /// protected virtual bool IsReusable(Schema.Type tag) { return true; } // interfaces to handle details of working with Specific vs Generic objects protected interface RecordAccess { /// /// Creates a new record object. Derived classes can override this to return an object of their choice. /// /// If appropriate, will reuse this object instead of constructing a new one /// object CreateRecord(object reuse); /// /// Used by the default implementation of ReadRecord() to get the existing field of a record object. The derived /// classes can override this to make their own interpretation of the record object. /// /// The record object to be probed into. This is guaranteed to be one that was returned /// by a previous call to CreateRecord. /// The name of the field to probe. /// field number /// The value of the field, if found. Null otherwise. object GetField(object record, string fieldName, int fieldPos); /// /// Used by the default implementation of ReadRecord() to add a field to a record object. The derived /// classes can override this to suit their own implementation of the record object. /// /// The record object to be probed into. This is guaranteed to be one that was returned /// by a previous call to CreateRecord. /// The name of the field to probe. /// field number /// The value to be added for the field void AddField(object record, string fieldName, int fieldPos, object fieldValue); } protected interface EnumAccess { object CreateEnum(object reuse, int ordinal); } protected interface FixedAccess { /// /// Returns a fixed object. /// /// If appropriate, uses this object instead of creating a new one. /// A fixed object with an appropriate buffer. object CreateFixed(object reuse); /// /// Returns a buffer of appropriate size to read data into. /// /// The fixed object. It is guaranteed that this is something that has been previously /// returned by CreateFixed /// A byte buffer of fixed's size. byte[] GetFixedBuffer(object f); } protected interface ArrayAccess { /// /// Creates a new array object. The initial size of the object could be anything. /// /// If appropriate use this instead of creating a new one. /// An object suitable to deserialize an avro array object Create(object reuse); /// /// Hint that the array should be able to handle at least targetSize elements. The array /// is not required to be resized /// /// Array object who needs to support targetSize elements. This is guaranteed to be somthing returned by /// a previous call to CreateArray(). /// The new size. void EnsureSize(ref object array, int targetSize); /// /// Resizes the array to the new value. /// /// Array object whose size is required. This is guaranteed to be somthing returned by /// a previous call to CreateArray(). /// The new size. void Resize(ref object array, int targetSize); void AddElements( object array, int elements, int index, ReadItem itemReader, Decoder decoder, bool reuse ); } protected interface MapAccess { /// /// Creates a new map object. /// /// If appropriate, use this map object instead of creating a new one. /// An empty map object. object Create(object reuse); void AddElements(object map, int elements, ReadItem itemReader, Decoder decoder, bool reuse); } private class SchemaPair { private Schema _writerSchema; private Schema _readerSchema; public SchemaPair( Schema writerSchema, Schema readerSchema ) { _writerSchema = writerSchema; _readerSchema = readerSchema; } protected bool Equals( SchemaPair other ) { return Equals( _writerSchema, other._writerSchema ) && Equals( _readerSchema, other._readerSchema ); } public override bool Equals( object obj ) { if( ReferenceEquals( null, obj ) ) return false; if( ReferenceEquals( this, obj ) ) return true; if( obj.GetType() != this.GetType() ) return false; return Equals( (SchemaPair) obj ); } public override int GetHashCode() { unchecked { return ( ( _writerSchema != null ? _writerSchema.GetHashCode() : 0 ) * 397 ) ^ ( _readerSchema != null ? _readerSchema.GetHashCode() : 0 ); } } } } }