/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections.Generic;
using System.IO;
using Avro.IO;
namespace Avro.Generic
{
///
/// A general purpose reader of data from avro streams. This reader analyzes and resolves the reader and writer schemas
/// when constructed so that reads can be more efficient. Once constructed, a reader can be reused or shared among threads
/// to avoid incurring more resolution costs.
///
public abstract class PreresolvingDatumReader : DatumReader
{
public Schema ReaderSchema { get; private set; }
public Schema WriterSchema { get; private set; }
protected delegate object ReadItem(object reuse, Decoder dec);
// read a specific field from a decoder
private delegate object DecoderRead(Decoder dec);
// skip specific field(s) from a decoder
private delegate void DecoderSkip(Decoder dec);
// read & set fields on a record
private delegate void FieldReader(object record, Decoder decoder);
private readonly ReadItem _reader;
private readonly Dictionary _recordReaders = new Dictionary();
protected PreresolvingDatumReader(Schema writerSchema, Schema readerSchema)
{
ReaderSchema = readerSchema;
WriterSchema = writerSchema;
if (!ReaderSchema.CanRead(WriterSchema))
throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema);
_reader = ResolveReader(writerSchema, readerSchema);
}
public T Read(T reuse, Decoder decoder)
{
return (T)_reader(reuse, decoder);
}
protected abstract ArrayAccess GetArrayAccess(ArraySchema readerSchema);
protected abstract EnumAccess GetEnumAccess(EnumSchema readerSchema);
protected abstract MapAccess GetMapAccess(MapSchema readerSchema);
protected abstract RecordAccess GetRecordAccess(RecordSchema readerSchema);
protected abstract FixedAccess GetFixedAccess(FixedSchema readerSchema);
///
/// Build a reader that accounts for schema differences between the reader and writer schemas.
///
private ReadItem ResolveReader(Schema writerSchema, Schema readerSchema)
{
if (readerSchema.Tag == Schema.Type.Union && writerSchema.Tag != Schema.Type.Union)
{
readerSchema = FindBranch(readerSchema as UnionSchema, writerSchema);
}
switch (writerSchema.Tag)
{
case Schema.Type.Null:
return ReadNull;
case Schema.Type.Boolean:
return ReadBoolean;
case Schema.Type.Int:
{
switch (readerSchema.Tag)
{
case Schema.Type.Long:
return Read(d => (long) d.ReadInt());
case Schema.Type.Float:
return Read(d => (float) d.ReadInt());
case Schema.Type.Double:
return Read(d => (double) d.ReadInt());
default:
return Read(d => d.ReadInt());
}
}
case Schema.Type.Long:
{
switch (readerSchema.Tag)
{
case Schema.Type.Float:
return Read(d => (float) d.ReadLong());
case Schema.Type.Double:
return Read(d => (double) d.ReadLong());
default:
return Read(d => d.ReadLong());
}
}
case Schema.Type.Float:
{
switch (readerSchema.Tag)
{
case Schema.Type.Double:
return Read(d => (double) d.ReadFloat());
default:
return Read(d => d.ReadFloat());
}
}
case Schema.Type.Double:
return Read(d => d.ReadDouble());
case Schema.Type.String:
return Read(d => d.ReadString());
case Schema.Type.Bytes:
return Read(d => d.ReadBytes());
case Schema.Type.Error:
case Schema.Type.Record:
return ResolveRecord((RecordSchema)writerSchema, (RecordSchema)readerSchema);
case Schema.Type.Enumeration:
return ResolveEnum((EnumSchema)writerSchema, (EnumSchema)readerSchema);
case Schema.Type.Fixed:
return ResolveFixed((FixedSchema)writerSchema, (FixedSchema)readerSchema);
case Schema.Type.Array:
return ResolveArray((ArraySchema)writerSchema, (ArraySchema)readerSchema);
case Schema.Type.Map:
return ResolveMap((MapSchema)writerSchema, (MapSchema)readerSchema);
case Schema.Type.Union:
return ResolveUnion((UnionSchema)writerSchema, readerSchema);
default:
throw new AvroException("Unknown schema type: " + writerSchema);
}
}
private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
{
var enumAccess = GetEnumAccess(readerSchema);
if (readerSchema.Equals(writerSchema))
{
return (r, d) => enumAccess.CreateEnum(r, d.ReadEnum());
}
var translator = new int[writerSchema.Symbols.Count];
foreach (var symbol in writerSchema.Symbols)
{
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
translator[writerOrdinal] = readerSchema.Ordinal(symbol);
}
else
{
translator[writerOrdinal] = -1;
}
}
return (r, d) =>
{
var writerOrdinal = d.ReadEnum();
var readerOrdinal = translator[writerOrdinal];
if (readerOrdinal == -1)
{
throw new AvroException("No such symbol: " + writerSchema[writerOrdinal]);
}
return enumAccess.CreateEnum(r, readerOrdinal);
};
}
private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSchema)
{
var schemaPair = new SchemaPair(writerSchema, readerSchema);
ReadItem recordReader;
if (_recordReaders.TryGetValue(schemaPair, out recordReader))
{
return recordReader;
}
FieldReader[] fieldReaderArray = null;
var recordAccess = GetRecordAccess(readerSchema);
recordReader = (r, d) => ReadRecord(r, d, recordAccess, fieldReaderArray);
_recordReaders.Add(schemaPair, recordReader);
var readSteps = new List();
foreach (Field wf in writerSchema)
{
Field rf;
if (readerSchema.TryGetFieldAlias(wf.Name, out rf))
{
var readItem = ResolveReader(wf.Schema, rf.Schema);
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec,d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
readItem(recordAccess.GetField(rec, rf.Name, rf.Pos), d)));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
readItem(null, d)));
}
}
else
{
var skip = GetSkip(wf.Schema);
readSteps.Add((rec, d) => skip(d));
}
}
// fill in defaults for any reader fields not in the writer schema
foreach (Field rf in readerSchema)
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();
var readItem = ResolveReader(rf.Schema, rf.Schema);
var rfInstance = rf;
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream( defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
}
fieldReaderArray = readSteps.ToArray();
return recordReader;
}
private object ReadRecord(object reuse, Decoder decoder, RecordAccess recordAccess, IEnumerable readSteps )
{
var rec = recordAccess.CreateRecord(reuse);
foreach (FieldReader fr in readSteps)
{
fr(rec, decoder);
// TODO: on exception, report offending field
}
return rec;
}
private ReadItem ResolveUnion(UnionSchema writerSchema, Schema readerSchema)
{
var lookup = new ReadItem[writerSchema.Count];
for (int i = 0; i < writerSchema.Count; i++)
{
var writerBranch = writerSchema[i];
if (readerSchema is UnionSchema)
{
var unionReader = (UnionSchema) readerSchema;
var readerBranch = unionReader.MatchingBranch(writerBranch);
if (readerBranch == -1)
{
lookup[i] = (r, d) => { throw new AvroException( "No matching schema for " + writerBranch + " in " + unionReader ); };
}
else
{
lookup[i] = ResolveReader(writerBranch, unionReader[readerBranch]);
}
}
else
{
if (!readerSchema.CanRead(writerBranch))
{
lookup[i] = (r, d) => { throw new AvroException( "Schema mismatch Reader: " + ReaderSchema + ", writer: " + WriterSchema ); };
}
else
{
lookup[i] = ResolveReader(writerBranch, readerSchema);
}
}
}
return (r, d) => ReadUnion(r, d, lookup);
}
private object ReadUnion(object reuse, Decoder d, ReadItem[] branchLookup)
{
return branchLookup[d.ReadUnionIndex()](reuse, d);
}
private ReadItem ResolveMap(MapSchema writerSchema, MapSchema readerSchema)
{
var rs = readerSchema.ValueSchema;
var ws = writerSchema.ValueSchema;
var reader = ResolveReader(ws, rs);
var mapAccess = GetMapAccess(readerSchema);
return (r,d) => ReadMap(r, d, mapAccess, reader);
}
private object ReadMap(object reuse, Decoder decoder, MapAccess mapAccess, ReadItem valueReader)
{
object map = mapAccess.Create(reuse);
for (int n = (int)decoder.ReadMapStart(); n != 0; n = (int)decoder.ReadMapNext())
{
mapAccess.AddElements(map, n, valueReader, decoder, false);
}
return map;
}
private ReadItem ResolveArray(ArraySchema writerSchema, ArraySchema readerSchema)
{
var itemReader = ResolveReader(writerSchema.ItemSchema, readerSchema.ItemSchema);
var arrayAccess = GetArrayAccess(readerSchema);
return (r, d) => ReadArray(r, d, arrayAccess, itemReader, IsReusable(readerSchema.ItemSchema.Tag));
}
private object ReadArray(object reuse, Decoder decoder, ArrayAccess arrayAccess, ReadItem itemReader, bool itemReusable)
{
object array = arrayAccess.Create(reuse);
int i = 0;
for (int n = (int)decoder.ReadArrayStart(); n != 0; n = (int)decoder.ReadArrayNext())
{
arrayAccess.EnsureSize(ref array, i + n);
arrayAccess.AddElements(array, n, i, itemReader, decoder, itemReusable);
i += n;
}
arrayAccess.Resize(ref array, i);
return array;
}
private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema)
{
if (readerSchema.Size != writerSchema.Size)
{
throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema +
", reader: " + readerSchema);
}
var fixedAccess = GetFixedAccess(readerSchema);
return (r, d) => ReadFixed(r, d, fixedAccess);
}
private object ReadFixed(object reuse, Decoder decoder, FixedAccess fixedAccess)
{
var fixedrec = fixedAccess.CreateFixed(reuse);
decoder.ReadFixed(fixedAccess.GetFixedBuffer(fixedrec));
return fixedrec;
}
protected static Schema FindBranch(UnionSchema us, Schema s)
{
int index = us.MatchingBranch(s);
if (index >= 0) return us[index];
throw new AvroException("No matching schema for " + s + " in " + us);
}
private object ReadNull(object reuse, Decoder decoder)
{
decoder.ReadNull();
return null;
}
private object ReadBoolean(object reuse, Decoder decoder)
{
return decoder.ReadBoolean();
}
private ReadItem Read(DecoderRead decoderRead)
{
return (r, d) => decoderRead(d);
}
private DecoderSkip GetSkip(Schema writerSchema)
{
switch (writerSchema.Tag)
{
case Schema.Type.Null:
return d => d.SkipNull();
case Schema.Type.Boolean:
return d => d.SkipBoolean();
case Schema.Type.Int:
return d => d.SkipInt();
case Schema.Type.Long:
return d => d.SkipLong();
case Schema.Type.Float:
return d => d.SkipFloat();
case Schema.Type.Double:
return d => d.SkipDouble();
case Schema.Type.String:
return d => d.SkipString();
case Schema.Type.Bytes:
return d => d.SkipBytes();
case Schema.Type.Error:
case Schema.Type.Record:
var recordSkips = new List();
var recSchema = (RecordSchema)writerSchema;
recSchema.Fields.ForEach(r => recordSkips.Add(GetSkip(r.Schema)));
return d => recordSkips.ForEach(s=>s(d));
case Schema.Type.Enumeration:
return d => d.SkipEnum();
case Schema.Type.Fixed:
var size = ((FixedSchema)writerSchema).Size;
return d => d.SkipFixed(size);
case Schema.Type.Array:
var itemSkip = GetSkip(((ArraySchema)writerSchema).ItemSchema);
return d =>
{
for (long n = d.ReadArrayStart(); n != 0; n = d.ReadArrayNext())
{
for (long i = 0; i < n; i++) itemSkip(d);
}
};
case Schema.Type.Map:
{
var valueSkip = GetSkip(((MapSchema)writerSchema).ValueSchema);
return d =>
{
for (long n = d.ReadMapStart(); n != 0; n = d.ReadMapNext())
{
for (long i = 0; i < n; i++) { d.SkipString(); valueSkip(d); }
}
};
}
case Schema.Type.Union:
var unionSchema = (UnionSchema)writerSchema;
var lookup = new DecoderSkip[unionSchema.Count];
for (int i = 0; i < unionSchema.Count; i++)
{
lookup[i] = GetSkip( unionSchema[i] );
}
return d => lookup[d.ReadUnionIndex()](d);
default:
throw new AvroException("Unknown schema type: " + writerSchema);
}
}
///
/// Indicates if it's possible to reuse an object of the specified type. Generally
/// false for immutable objects like int, long, string, etc but may differ between
/// the Specific and Generic implementations. Used to avoid retrieving the existing
/// value if it's not reusable.
///
protected virtual bool IsReusable(Schema.Type tag)
{
return true;
}
// interfaces to handle details of working with Specific vs Generic objects
protected interface RecordAccess
{
///
/// Creates a new record object. Derived classes can override this to return an object of their choice.
///
/// If appropriate, will reuse this object instead of constructing a new one
///
object CreateRecord(object reuse);
///
/// Used by the default implementation of ReadRecord() to get the existing field of a record object. The derived
/// classes can override this to make their own interpretation of the record object.
///
/// The record object to be probed into. This is guaranteed to be one that was returned
/// by a previous call to CreateRecord.
/// The name of the field to probe.
/// field number
/// The value of the field, if found. Null otherwise.
object GetField(object record, string fieldName, int fieldPos);
///
/// Used by the default implementation of ReadRecord() to add a field to a record object. The derived
/// classes can override this to suit their own implementation of the record object.
///
/// The record object to be probed into. This is guaranteed to be one that was returned
/// by a previous call to CreateRecord.
/// The name of the field to probe.
/// field number
/// The value to be added for the field
void AddField(object record, string fieldName, int fieldPos, object fieldValue);
}
protected interface EnumAccess
{
object CreateEnum(object reuse, int ordinal);
}
protected interface FixedAccess
{
///
/// Returns a fixed object.
///
/// If appropriate, uses this object instead of creating a new one.
/// A fixed object with an appropriate buffer.
object CreateFixed(object reuse);
///
/// Returns a buffer of appropriate size to read data into.
///
/// The fixed object. It is guaranteed that this is something that has been previously
/// returned by CreateFixed
/// A byte buffer of fixed's size.
byte[] GetFixedBuffer(object f);
}
protected interface ArrayAccess
{
///
/// Creates a new array object. The initial size of the object could be anything.
///
/// If appropriate use this instead of creating a new one.
/// An object suitable to deserialize an avro array
object Create(object reuse);
///
/// Hint that the array should be able to handle at least targetSize elements. The array
/// is not required to be resized
///
/// Array object who needs to support targetSize elements. This is guaranteed to be somthing returned by
/// a previous call to CreateArray().
/// The new size.
void EnsureSize(ref object array, int targetSize);
///
/// Resizes the array to the new value.
///
/// Array object whose size is required. This is guaranteed to be somthing returned by
/// a previous call to CreateArray().
/// The new size.
void Resize(ref object array, int targetSize);
void AddElements( object array, int elements, int index, ReadItem itemReader, Decoder decoder, bool reuse );
}
protected interface MapAccess
{
///
/// Creates a new map object.
///
/// If appropriate, use this map object instead of creating a new one.
/// An empty map object.
object Create(object reuse);
void AddElements(object map, int elements, ReadItem itemReader, Decoder decoder, bool reuse);
}
private class SchemaPair
{
private Schema _writerSchema;
private Schema _readerSchema;
public SchemaPair( Schema writerSchema, Schema readerSchema )
{
_writerSchema = writerSchema;
_readerSchema = readerSchema;
}
protected bool Equals( SchemaPair other )
{
return Equals( _writerSchema, other._writerSchema ) && Equals( _readerSchema, other._readerSchema );
}
public override bool Equals( object obj )
{
if( ReferenceEquals( null, obj ) ) return false;
if( ReferenceEquals( this, obj ) ) return true;
if( obj.GetType() != this.GetType() ) return false;
return Equals( (SchemaPair) obj );
}
public override int GetHashCode()
{
unchecked
{
return ( ( _writerSchema != null ? _writerSchema.GetHashCode() : 0 ) * 397 ) ^ ( _readerSchema != null ? _readerSchema.GetHashCode() : 0 );
}
}
}
}
}