/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.IO;
using Avro;
using Avro.IO;
using Avro.Generic;
namespace Avro.Specific
{
///
/// Reader wrapper class for reading data and storing into specific classes
///
/// Specific class type
public class SpecificReader : DatumReader
{
///
/// Reader class for reading data and storing into specific classes
///
private readonly SpecificDefaultReader reader;
///
/// Schema for the writer class
///
public Schema WriterSchema { get { return reader.WriterSchema; } }
///
/// Schema for the reader class
///
public Schema ReaderSchema { get { return reader.ReaderSchema; } }
///
/// Constructs a generic reader for the given schemas using the DefaultReader. If the
/// reader's and writer's schemas are different this class performs the resolution.
///
/// The schema used while generating the data
/// The schema desired by the reader
public SpecificReader(Schema writerSchema, Schema readerSchema)
{
reader = new SpecificDefaultReader(writerSchema, readerSchema);
}
public SpecificReader(SpecificDefaultReader reader)
{
this.reader = reader;
}
///
/// Generic read function
///
/// object to store data read
/// decorder to use for reading data
///
public T Read(T reuse, Decoder dec)
{
return reader.Read(reuse, dec);
}
}
///
/// Reader class for reading data and storing into specific classes
///
public class SpecificDefaultReader : DefaultReader
{
///
/// Static dictionary of type names and its corresponding assembly type.
/// This is used to prevent multiple reflection for the same type name.
///
private static IDictionary TypeName = new Dictionary();
///
/// Constructor
///
/// schema of the object that wrote the data
/// schema of the object that will store the data
public SpecificDefaultReader(Schema writerSchema, Schema readerSchema) : base(writerSchema,readerSchema)
{
}
///
/// Deserializes a record from the stream.
///
/// If not null, a record object that could be reused for returning the result
/// The writer's RecordSchema
/// The reader's schema, must be RecordSchema too.
/// The decoder for deserialization
/// The record object just read
protected override object ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)
{
RecordSchema rs = (RecordSchema)readerSchema;
if (rs.Name == null)
return base.ReadRecord(reuse, writerSchema, readerSchema, dec);
ISpecificRecord rec = (reuse != null ? reuse : ObjectCreator.Instance.New(rs.Fullname, Schema.Type.Record)) as ISpecificRecord;
object obj;
foreach (Field wf in writerSchema)
{
try
{
Field rf;
if (rs.TryGetField(wf.Name, out rf))
{
obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, wf.Schema, rf.Schema, dec));
}
else
Skip(wf.Schema, dec);
}
catch (Exception ex)
{
throw new AvroException(ex.Message + " in field " + wf.Name);
}
}
var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
return rec;
}
///
/// Deserializes a fixed object and returns the object. The default implementation uses CreateFixed()
/// and GetFixedBuffer() and returns what CreateFixed() returned.
///
/// If appropriate, uses this object instead of creating a new one.
/// The FixedSchema the writer used during serialization.
/// The schema that the readr uses. Must be a FixedSchema with the same
/// size as the writerSchema.
/// The decoder for deserialization.
/// The deserilized object.
protected override object ReadFixed(object reuse, FixedSchema writerSchema, Schema readerSchema, Decoder d)
{
FixedSchema rs = readerSchema as FixedSchema;
if (rs.Size != writerSchema.Size)
{
throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema +
", reader: " + readerSchema);
}
SpecificFixed fixedrec = (reuse != null ? reuse : ObjectCreator.Instance.New(rs.Fullname, Schema.Type.Fixed)) as SpecificFixed;
d.ReadFixed(fixedrec.Value);
return fixedrec;
}
///
/// Reads an enum from the given decoder
///
/// object to store data read
/// schema of the object that wrote the data
/// schema of the object that will store the data
/// decoder object that contains the data to be read
/// enum value
protected override object ReadEnum(object reuse, EnumSchema writerSchema, Schema readerSchema, Decoder dec)
{
EnumSchema rs = readerSchema as EnumSchema;
return rs.Ordinal(writerSchema[dec.ReadEnum()]);
}
///
/// Reads an array from the given decoder
///
/// object to store data read
/// schema of the object that wrote the data
/// schema of the object that will store the data
/// decoder object that contains the data to be read
/// array
protected override object ReadArray(object reuse, ArraySchema writerSchema, Schema readerSchema, Decoder dec)
{
ArraySchema rs = readerSchema as ArraySchema;
System.Collections.IList array;
if (reuse != null)
{
array = reuse as System.Collections.IList;
if (array == null)
throw new AvroException("array object does not implement non-generic IList");
array.Clear();
}
else
array = ObjectCreator.Instance.New(getTargetType(readerSchema), Schema.Type.Array) as System.Collections.IList;
int i = 0;
for (int n = (int)dec.ReadArrayStart(); n != 0; n = (int)dec.ReadArrayNext())
{
for (int j = 0; j < n; j++, i++)
array.Add(Read(null, writerSchema.ItemSchema, rs.ItemSchema, dec));
}
return array;
}
///
/// Deserialized an avro map. The default implemenation creats a new map using CreateMap() and then
/// adds elements to the map using AddMapEntry().
///
/// If appropriate, use this instead of creating a new map object.
/// The schema the writer used to write the map.
/// The schema the reader is using.
/// The decoder for serialization.
/// The deserialized map object.
protected override object ReadMap(object reuse, MapSchema writerSchema, Schema readerSchema, Decoder d)
{
MapSchema rs = readerSchema as MapSchema;
System.Collections.IDictionary map;
if (reuse != null)
{
map = reuse as System.Collections.IDictionary;
if (map == null)
throw new AvroException("map object does not implement non-generic IList");
map.Clear();
}
else
map = ObjectCreator.Instance.New(getTargetType(readerSchema), Schema.Type.Map) as System.Collections.IDictionary;
for (int n = (int)d.ReadMapStart(); n != 0; n = (int)d.ReadMapNext())
{
for (int j = 0; j < n; j++)
{
string k = d.ReadString();
map[k] = Read(null, writerSchema.ValueSchema, rs.ValueSchema, d); // always create new map item
}
}
return map;
}
///
/// Gets the target type name in the given schema
///
/// schema containing the type to be determined
/// used for union schema
///
protected virtual string getTargetType(Schema schema)
{
bool nEnum = false;
string type = Avro.CodeGen.getType(schema, false, ref nEnum);
if (schema.Tag == Schema.Type.Array)
{
type = type.Remove(0, 6); // remove IList<
type = type.Remove(type.Length - 1); // remove >
}
else if (schema.Tag == Schema.Type.Map)
{
type = type.Remove(0, 19); // remove IDictionary
}
return type;
}
}
}