/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.IO; using Avro; using Avro.IO; using Avro.Generic; namespace Avro.Specific { /// /// Reader wrapper class for reading data and storing into specific classes /// /// Specific class type public class SpecificReader : DatumReader { /// /// Reader class for reading data and storing into specific classes /// private readonly SpecificDefaultReader reader; /// /// Schema for the writer class /// public Schema WriterSchema { get { return reader.WriterSchema; } } /// /// Schema for the reader class /// public Schema ReaderSchema { get { return reader.ReaderSchema; } } /// /// Constructs a generic reader for the given schemas using the DefaultReader. If the /// reader's and writer's schemas are different this class performs the resolution. /// /// The schema used while generating the data /// The schema desired by the reader public SpecificReader(Schema writerSchema, Schema readerSchema) { reader = new SpecificDefaultReader(writerSchema, readerSchema); } public SpecificReader(SpecificDefaultReader reader) { this.reader = reader; } /// /// Generic read function /// /// object to store data read /// decorder to use for reading data /// public T Read(T reuse, Decoder dec) { return reader.Read(reuse, dec); } } /// /// Reader class for reading data and storing into specific classes /// public class SpecificDefaultReader : DefaultReader { /// /// Static dictionary of type names and its corresponding assembly type. /// This is used to prevent multiple reflection for the same type name. /// private static IDictionary TypeName = new Dictionary(); /// /// Constructor /// /// schema of the object that wrote the data /// schema of the object that will store the data public SpecificDefaultReader(Schema writerSchema, Schema readerSchema) : base(writerSchema,readerSchema) { } /// /// Deserializes a record from the stream. /// /// If not null, a record object that could be reused for returning the result /// The writer's RecordSchema /// The reader's schema, must be RecordSchema too. /// The decoder for deserialization /// The record object just read protected override object ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec) { RecordSchema rs = (RecordSchema)readerSchema; if (rs.Name == null) return base.ReadRecord(reuse, writerSchema, readerSchema, dec); ISpecificRecord rec = (reuse != null ? reuse : ObjectCreator.Instance.New(rs.Fullname, Schema.Type.Record)) as ISpecificRecord; object obj; foreach (Field wf in writerSchema) { try { Field rf; if (rs.TryGetField(wf.Name, out rf)) { obj = rec.Get(rf.Pos); rec.Put(rf.Pos, Read(obj, wf.Schema, rf.Schema, dec)); } else Skip(wf.Schema, dec); } catch (Exception ex) { throw new AvroException(ex.Message + " in field " + wf.Name); } } var defaultStream = new MemoryStream(); var defaultEncoder = new BinaryEncoder(defaultStream); var defaultDecoder = new BinaryDecoder(defaultStream); foreach (Field rf in rs) { if (writerSchema.Contains(rf.Name)) continue; defaultStream.Position = 0; // reset for writing Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue); defaultStream.Flush(); defaultStream.Position = 0; // reset for reading obj = rec.Get(rf.Pos); rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder)); } return rec; } /// /// Deserializes a fixed object and returns the object. The default implementation uses CreateFixed() /// and GetFixedBuffer() and returns what CreateFixed() returned. /// /// If appropriate, uses this object instead of creating a new one. /// The FixedSchema the writer used during serialization. /// The schema that the readr uses. Must be a FixedSchema with the same /// size as the writerSchema. /// The decoder for deserialization. /// The deserilized object. protected override object ReadFixed(object reuse, FixedSchema writerSchema, Schema readerSchema, Decoder d) { FixedSchema rs = readerSchema as FixedSchema; if (rs.Size != writerSchema.Size) { throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema + ", reader: " + readerSchema); } SpecificFixed fixedrec = (reuse != null ? reuse : ObjectCreator.Instance.New(rs.Fullname, Schema.Type.Fixed)) as SpecificFixed; d.ReadFixed(fixedrec.Value); return fixedrec; } /// /// Reads an enum from the given decoder /// /// object to store data read /// schema of the object that wrote the data /// schema of the object that will store the data /// decoder object that contains the data to be read /// enum value protected override object ReadEnum(object reuse, EnumSchema writerSchema, Schema readerSchema, Decoder dec) { EnumSchema rs = readerSchema as EnumSchema; return rs.Ordinal(writerSchema[dec.ReadEnum()]); } /// /// Reads an array from the given decoder /// /// object to store data read /// schema of the object that wrote the data /// schema of the object that will store the data /// decoder object that contains the data to be read /// array protected override object ReadArray(object reuse, ArraySchema writerSchema, Schema readerSchema, Decoder dec) { ArraySchema rs = readerSchema as ArraySchema; System.Collections.IList array; if (reuse != null) { array = reuse as System.Collections.IList; if (array == null) throw new AvroException("array object does not implement non-generic IList"); array.Clear(); } else array = ObjectCreator.Instance.New(getTargetType(readerSchema), Schema.Type.Array) as System.Collections.IList; int i = 0; for (int n = (int)dec.ReadArrayStart(); n != 0; n = (int)dec.ReadArrayNext()) { for (int j = 0; j < n; j++, i++) array.Add(Read(null, writerSchema.ItemSchema, rs.ItemSchema, dec)); } return array; } /// /// Deserialized an avro map. The default implemenation creats a new map using CreateMap() and then /// adds elements to the map using AddMapEntry(). /// /// If appropriate, use this instead of creating a new map object. /// The schema the writer used to write the map. /// The schema the reader is using. /// The decoder for serialization. /// The deserialized map object. protected override object ReadMap(object reuse, MapSchema writerSchema, Schema readerSchema, Decoder d) { MapSchema rs = readerSchema as MapSchema; System.Collections.IDictionary map; if (reuse != null) { map = reuse as System.Collections.IDictionary; if (map == null) throw new AvroException("map object does not implement non-generic IList"); map.Clear(); } else map = ObjectCreator.Instance.New(getTargetType(readerSchema), Schema.Type.Map) as System.Collections.IDictionary; for (int n = (int)d.ReadMapStart(); n != 0; n = (int)d.ReadMapNext()) { for (int j = 0; j < n; j++) { string k = d.ReadString(); map[k] = Read(null, writerSchema.ValueSchema, rs.ValueSchema, d); // always create new map item } } return map; } /// /// Gets the target type name in the given schema /// /// schema containing the type to be determined /// used for union schema /// protected virtual string getTargetType(Schema schema) { bool nEnum = false; string type = Avro.CodeGen.getType(schema, false, ref nEnum); if (schema.Tag == Schema.Type.Array) { type = type.Remove(0, 6); // remove IList< type = type.Remove(type.Length - 1); // remove > } else if (schema.Tag == Schema.Type.Map) { type = type.Remove(0, 19); // remove IDictionary } return type; } } }