/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using Avro.IO; namespace Avro.Generic { public delegate void Writer(T t); /// /// A typesafe wrapper around DefaultWriter. While a specific object of DefaultWriter /// allows the client to serialize a generic type, an object of this class allows /// only a single type of object to be serialized through it. /// /// The type of object to be serialized. public class GenericWriter : DatumWriter { private readonly DefaultWriter writer; public GenericWriter(Schema schema) : this(new DefaultWriter(schema)) { } public Schema Schema { get { return writer.Schema; } } public GenericWriter(DefaultWriter writer) { this.writer = writer; } /// /// Serializes the given object using this writer's schema. /// /// The value to be serialized /// The encoder to use for serializing public void Write(T value, Encoder encoder) { writer.Write(value, encoder); } } /// /// A General purpose writer for serializing objects into a Stream using /// Avro. This class implements a default way of serializing objects. But /// one can derive a class from this and override different methods to /// acheive results that are different from the default implementation. /// public class DefaultWriter { public Schema Schema { get; private set; } /// /// Constructs a generic writer for the given schema. /// /// The schema for the object to be serialized public DefaultWriter(Schema schema) { this.Schema = schema; } public void Write(T value, Encoder encoder) { Write(Schema, value, encoder); } /// /// Examines the schema and dispatches the actual work to one /// of the other methods of this class. This allows the derived /// classes to override specific methods and get custom results. /// /// The schema to use for serializing /// The value to be serialized /// The encoder to use during serialization public virtual void Write(Schema schema, object value, Encoder encoder) { switch (schema.Tag) { case Schema.Type.Null: WriteNull(value, encoder); break; case Schema.Type.Boolean: Write(value, schema.Tag, encoder.WriteBoolean); break; case Schema.Type.Int: Write(value, schema.Tag, encoder.WriteInt); break; case Schema.Type.Long: Write(value, schema.Tag, encoder.WriteLong); break; case Schema.Type.Float: Write(value, schema.Tag, encoder.WriteFloat); break; case Schema.Type.Double: Write(value, schema.Tag, encoder.WriteDouble); break; case Schema.Type.String: Write(value, schema.Tag, encoder.WriteString); break; case Schema.Type.Bytes: Write(value, schema.Tag, encoder.WriteBytes); break; case Schema.Type.Record: case Schema.Type.Error: WriteRecord(schema as RecordSchema, value, encoder); break; case Schema.Type.Enumeration: WriteEnum(schema as EnumSchema, value, encoder); break; case Schema.Type.Fixed: WriteFixed(schema as FixedSchema, value, encoder); break; case Schema.Type.Array: WriteArray(schema as ArraySchema, value, encoder); break; case Schema.Type.Map: WriteMap(schema as MapSchema, value, encoder); break; case Schema.Type.Union: WriteUnion(schema as UnionSchema, value, encoder); break; default: error(schema, value); break; } } /// /// Serializes a "null" /// /// The object to be serialized using null schema /// The encoder to use while serialization protected virtual void WriteNull(object value, Encoder encoder) { if (value != null) throw TypeMismatch(value, "null", "null"); } /// /// A generic method to serialize primitive Avro types. /// /// Type of the C# type to be serialized /// The value to be serialized /// The schema type tag /// The writer which should be used to write the given type. protected virtual void Write(object value, Schema.Type tag, Writer writer) { if (!(value is S)) throw TypeMismatch(value, tag.ToString(), typeof(S).ToString()); writer((S)value); } /// /// Serialized a record using the given RecordSchema. It uses GetField method /// to extract the field value from the given object. /// /// The RecordSchema to use for serialization /// The value to be serialized /// The Encoder for serialization protected virtual void WriteRecord(RecordSchema schema, object value, Encoder encoder) { EnsureRecordObject(schema, value); foreach (Field field in schema) { try { object obj = GetField(value, field.Name, field.Pos); Write(field.Schema, obj, encoder); } catch (Exception ex) { throw new AvroException(ex.Message + " in field " + field.Name); } } } protected virtual void EnsureRecordObject(RecordSchema s, object value) { if (value == null || !(value is GenericRecord) || !((value as GenericRecord).Schema.Equals(s))) { throw TypeMismatch(value, "record", "GenericRecord"); } } /// /// Extracts the field value from the given object. In this default implementation, /// value should be of type GenericRecord. /// /// The record value from which the field needs to be extracted /// The name of the field in the record /// The position of field in the record /// protected virtual object GetField(object value, string fieldName, int fieldPos) { GenericRecord d = value as GenericRecord; return d[fieldName]; } /// /// Serializes an enumeration. The default implementation expectes the value to be string whose /// value is the name of the enumeration. /// /// The EnumSchema for serialization /// Value to be written /// Encoder for serialization protected virtual void WriteEnum(EnumSchema es, object value, Encoder encoder) { if (value == null || !(value is GenericEnum) || !((value as GenericEnum).Schema.Equals(es))) throw TypeMismatch(value, "enum", "GenericEnum"); encoder.WriteEnum(es.Ordinal((value as GenericEnum).Value)); } /// /// Serialized an array. The default implementation calls EnsureArrayObject() to ascertain that the /// given value is an array. It then calls GetArrayLength() and GetArrayElement() /// to access the members of the array and then serialize them. /// /// The ArraySchema for serialization /// The value being serialized /// The encoder for serialization protected virtual void WriteArray(ArraySchema schema, object value, Encoder encoder) { EnsureArrayObject(value); long l = GetArrayLength(value); encoder.WriteArrayStart(); encoder.SetItemCount(l); for (long i = 0; i < l; i++) { encoder.StartItem(); Write(schema.ItemSchema, GetArrayElement(value, i), encoder); } encoder.WriteArrayEnd(); } /// /// Checks if the given object is an array. If it is a valid array, this function returns normally. Otherwise, /// it throws an exception. The default implementation checks if the value is an array. /// /// protected virtual void EnsureArrayObject(object value) { if (value == null || !(value is Array)) throw TypeMismatch(value, "array", "Array"); } /// /// Returns the length of an array. The default implementation requires the object /// to be an array of objects and returns its length. The defaul implementation /// gurantees that EnsureArrayObject() has been called on the value before this /// function is called. /// /// The object whose array length is required /// The array length of the given object protected virtual long GetArrayLength(object value) { return (value as Array).Length; } /// /// Returns the element at the given index from the given array object. The default implementation /// requires that the value is an object array and returns the element in that array. The defaul implementation /// gurantees that EnsureArrayObject() has been called on the value before this /// function is called. /// /// The array object /// The index to look for /// The array element at the index protected virtual object GetArrayElement(object value, long index) { return (value as Array).GetValue(index); } /// /// Serialized a map. The default implementation first ensure that the value is indeed a map and then uses /// GetMapSize() and GetMapElements() to access the contents of the map. /// /// The MapSchema for serialization /// The value to be serialized /// The encoder for serialization protected virtual void WriteMap(MapSchema schema, object value, Encoder encoder) { EnsureMapObject(value); IDictionary vv = (IDictionary)value; encoder.WriteMapStart(); encoder.SetItemCount(GetMapSize(value)); foreach (KeyValuePair obj in GetMapValues(vv)) { encoder.StartItem(); encoder.WriteString(obj.Key); Write(schema.ValueSchema, obj.Value, encoder); } encoder.WriteMapEnd(); } /// /// Checks if the given object is a map. If it is a valid map, this function returns normally. Otherwise, /// it throws an exception. The default implementation checks if the value is an IDictionary. /// /// protected virtual void EnsureMapObject(object value) { if (value == null || !(value is IDictionary)) throw TypeMismatch(value, "map", "IDictionary"); } /// /// Returns the size of the map object. The default implementation gurantees that EnsureMapObject has been /// successfully called with the given value. The default implementation requires the value /// to be an IDictionary and returns the number of elements in it. /// /// The map object whose size is desired /// The size of the given map object protected virtual long GetMapSize(object value) { return (value as IDictionary).Count; } /// /// Returns the contents of the given map object. The default implementation guarantees that EnsureMapObject /// has been called with the given value. The defualt implementation of this method requires that /// the value is an IDictionary and returns its contents. /// /// The map object whose size is desired /// The contents of the given map object protected virtual IEnumerable> GetMapValues(object value) { return value as IDictionary; } /// /// Resolves the given value against the given UnionSchema and serializes the object against /// the resolved schema member. The default implementation of this method uses /// ResolveUnion to find the member schema within the UnionSchema. /// /// The UnionSchema to resolve against /// The value to be serialized /// The encoder for serialization protected virtual void WriteUnion(UnionSchema us, object value, Encoder encoder) { int index = ResolveUnion(us, value); encoder.WriteUnionIndex(index); Write(us[index], value, encoder); } /// /// Finds the branch within the given UnionSchema that matches the given object. The default implementation /// calls Matches() method in the order of branches within the UnionSchema. If nothing matches, throws /// an exception. /// /// The UnionSchema to resolve against /// The object that should be used in matching /// protected virtual int ResolveUnion(UnionSchema us, object obj) { for (int i = 0; i < us.Count; i++) { if (Matches(us[i], obj)) return i; } throw new AvroException("Cannot find a match for " + obj.GetType() + " in " + us); } /// /// Serialized a fixed object. The default implementation requires that the value is /// a GenericFixed object with an identical schema as es. /// /// The schema for serialization /// The value to be serialized /// The encoder for serialization protected virtual void WriteFixed(FixedSchema es, object value, Encoder encoder) { if (value == null || !(value is GenericFixed) || !(value as GenericFixed).Schema.Equals(es)) { throw TypeMismatch(value, "fixed", "GenericFixed"); } GenericFixed ba = (GenericFixed)value; encoder.WriteFixed(ba.Value); } protected AvroException TypeMismatch(object obj, string schemaType, string type) { return new AvroException(type + " required to write against " + schemaType + " schema but found " + (null == obj ? "null" : obj.GetType().ToString()) ); } private void error(Schema schema, Object value) { throw new AvroTypeException("Not a " + schema + ": " + value); } /* * FIXME: This method of determining the Union branch has problems. If the data is IDictionary * if there are two branches one with record schema and the other with map, it choose the first one. Similarly if * the data is byte[] and there are fixed and bytes schemas as branches, it choose the first one that matches. * Also it does not recognize the arrays of primitive types. */ protected virtual bool Matches(Schema sc, object obj) { if (obj == null && sc.Tag != Avro.Schema.Type.Null) return false; switch (sc.Tag) { case Schema.Type.Null: return obj == null; case Schema.Type.Boolean: return obj is bool; case Schema.Type.Int: return obj is int; case Schema.Type.Long: return obj is long; case Schema.Type.Float: return obj is float; case Schema.Type.Double: return obj is double; case Schema.Type.Bytes: return obj is byte[]; case Schema.Type.String: return obj is string; case Schema.Type.Record: //return obj is GenericRecord && (obj as GenericRecord).Schema.Equals(s); return obj is GenericRecord && (obj as GenericRecord).Schema.SchemaName.Equals((sc as RecordSchema).SchemaName); case Schema.Type.Enumeration: //return obj is GenericEnum && (obj as GenericEnum).Schema.Equals(s); return obj is GenericEnum && (obj as GenericEnum).Schema.SchemaName.Equals((sc as EnumSchema).SchemaName); case Schema.Type.Array: return obj is Array && !(obj is byte[]); case Schema.Type.Map: return obj is IDictionary; case Schema.Type.Union: return false; // Union directly within another union not allowed! case Schema.Type.Fixed: //return obj is GenericFixed && (obj as GenericFixed).Schema.Equals(s); return obj is GenericFixed && (obj as GenericFixed).Schema.SchemaName.Equals((sc as FixedSchema).SchemaName); default: throw new AvroException("Unknown schema type: " + sc.Tag); } } } }