/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using Encoder = Avro.IO.Encoder;
namespace Avro.Generic
{
///
/// A general purpose writer of data from avro streams. This writer analyzes the writer schema
/// when constructed so that writes can be more efficient. Once constructed, a writer can be reused or shared among threads
/// to avoid incurring more resolution costs.
///
public abstract class PreresolvingDatumWriter : DatumWriter
{
public Schema Schema { get; private set; }
protected delegate void WriteItem(Object value, Encoder encoder);
private readonly WriteItem _writer;
private readonly ArrayAccess _arrayAccess;
private readonly MapAccess _mapAccess;
private readonly Dictionary _recordWriters = new Dictionary();
public void Write(T datum, Encoder encoder)
{
_writer( datum, encoder );
}
protected PreresolvingDatumWriter(Schema schema, ArrayAccess arrayAccess, MapAccess mapAccess)
{
Schema = schema;
_arrayAccess = arrayAccess;
_mapAccess = mapAccess;
_writer = ResolveWriter(schema);
}
private WriteItem ResolveWriter( Schema schema )
{
switch (schema.Tag)
{
case Schema.Type.Null:
return WriteNull;
case Schema.Type.Boolean:
return (v, e) => Write( v, schema.Tag, e.WriteBoolean );
case Schema.Type.Int:
return (v, e) => Write( v, schema.Tag, e.WriteInt );
case Schema.Type.Long:
return (v, e) => Write( v, schema.Tag, e.WriteLong );
case Schema.Type.Float:
return (v, e) => Write( v, schema.Tag, e.WriteFloat );
case Schema.Type.Double:
return (v, e) => Write( v, schema.Tag, e.WriteDouble );
case Schema.Type.String:
return (v, e) => Write( v, schema.Tag, e.WriteString );
case Schema.Type.Bytes:
return (v, e) => Write( v, schema.Tag, e.WriteBytes );
case Schema.Type.Error:
case Schema.Type.Record:
return ResolveRecord((RecordSchema) schema);
case Schema.Type.Enumeration:
return ResolveEnum(schema as EnumSchema);
case Schema.Type.Fixed:
return (v, e) => WriteFixed(schema as FixedSchema, v, e);
case Schema.Type.Array:
return ResolveArray((ArraySchema)schema);
case Schema.Type.Map:
return ResolveMap((MapSchema)schema);
case Schema.Type.Union:
return ResolveUnion((UnionSchema)schema);
default:
return (v, e) => error(schema, v);
}
}
///
/// Serializes a "null"
///
/// The object to be serialized using null schema
/// The encoder to use while serialization
protected void WriteNull(object value, Encoder encoder)
{
if (value != null) throw TypeMismatch(value, "null", "null");
}
///
/// A generic method to serialize primitive Avro types.
///
/// Type of the C# type to be serialized
/// The value to be serialized
/// The schema type tag
/// The writer which should be used to write the given type.
protected void Write(object value, Schema.Type tag, Writer writer)
{
if (!(value is S)) throw TypeMismatch(value, tag.ToString(), typeof(S).ToString());
writer((S)value);
}
///
/// Serialized a record using the given RecordSchema. It uses GetField method
/// to extract the field value from the given object.
///
/// The RecordSchema to use for serialization
private WriteItem ResolveRecord(RecordSchema recordSchema)
{
WriteItem recordResolver;
if (_recordWriters.TryGetValue(recordSchema, out recordResolver))
{
return recordResolver;
}
var writeSteps = new RecordFieldWriter[recordSchema.Fields.Count];
recordResolver = (v, e) => WriteRecordFields(v, writeSteps, e);
_recordWriters.Add(recordSchema, recordResolver);
int index = 0;
foreach (Field field in recordSchema)
{
var record = new RecordFieldWriter
{
WriteField = ResolveWriter(field.Schema),
Field = field
};
writeSteps[index++] = record;
}
return recordResolver;
}
protected abstract void WriteRecordFields(object record, RecordFieldWriter[] writers, Encoder encoder);
protected class RecordFieldWriter
{
public WriteItem WriteField { get; set; }
public Field Field { get; set; }
}
protected abstract void EnsureRecordObject(RecordSchema recordSchema, object value);
///
/// Extracts the field value from the given object.
///
/// The record value from which the field needs to be extracted
/// The name of the field in the record
/// The position of field in the record
///
protected abstract void WriteField(object record, string fieldName, int fieldPos, WriteItem writer, Encoder encoder );
///
/// Serializes an enumeration.
///
/// The EnumSchema for serialization
protected abstract WriteItem ResolveEnum(EnumSchema es);
///
/// Serialized an array. The default implementation calls EnsureArrayObject() to ascertain that the
/// given value is an array. It then calls GetArrayLength() and GetArrayElement()
/// to access the members of the array and then serialize them.
///
/// The ArraySchema for serialization
/// The value being serialized
/// The encoder for serialization
protected WriteItem ResolveArray(ArraySchema schema)
{
var itemWriter = ResolveWriter(schema.ItemSchema);
return (d,e) => WriteArray(itemWriter, d, e);
}
private void WriteArray(WriteItem itemWriter, object array, Encoder encoder)
{
_arrayAccess.EnsureArrayObject(array);
long l = _arrayAccess.GetArrayLength(array);
encoder.WriteArrayStart();
encoder.SetItemCount(l);
_arrayAccess.WriteArrayValues(array, itemWriter, encoder);
encoder.WriteArrayEnd();
}
private WriteItem ResolveMap(MapSchema mapSchema)
{
var itemWriter = ResolveWriter(mapSchema.ValueSchema);
return (v, e) => WriteMap(itemWriter, v, e);
}
///
/// Serialized a map. The default implementation first ensure that the value is indeed a map and then uses
/// GetMapSize() and GetMapElements() to access the contents of the map.
///
/// The MapSchema for serialization
/// The value to be serialized
/// The encoder for serialization
protected void WriteMap(WriteItem itemWriter, object value, Encoder encoder)
{
_mapAccess.EnsureMapObject(value);
encoder.WriteMapStart();
encoder.SetItemCount(_mapAccess.GetMapSize(value));
_mapAccess.WriteMapValues(value, itemWriter, encoder);
encoder.WriteMapEnd();
}
private WriteItem ResolveUnion(UnionSchema unionSchema)
{
var branchSchemas = unionSchema.Schemas.ToArray();
var branchWriters = new WriteItem[branchSchemas.Length];
int branchIndex = 0;
foreach (var branch in branchSchemas)
{
branchWriters[branchIndex++] = ResolveWriter(branch);
}
return (v, e) => WriteUnion(unionSchema, branchSchemas, branchWriters, v, e);
}
///
/// Resolves the given value against the given UnionSchema and serializes the object against
/// the resolved schema member.
///
/// The UnionSchema to resolve against
/// The value to be serialized
/// The encoder for serialization
private void WriteUnion(UnionSchema unionSchema, Schema[] branchSchemas, WriteItem[] branchWriters, object value, Encoder encoder)
{
int index = ResolveUnion(unionSchema, branchSchemas, value);
encoder.WriteUnionIndex(index);
branchWriters[index](value, encoder);
}
///
/// Finds the branch within the given UnionSchema that matches the given object. The default implementation
/// calls Matches() method in the order of branches within the UnionSchema. If nothing matches, throws
/// an exception.
///
/// The UnionSchema to resolve against
/// The object that should be used in matching
///
protected int ResolveUnion(UnionSchema us, Schema[] branchSchemas, object obj)
{
for (int i = 0; i < branchSchemas.Length; i++)
{
if (UnionBranchMatches(branchSchemas[i], obj)) return i;
}
throw new AvroException("Cannot find a match for " + obj.GetType() + " in " + us);
}
///
/// Serialized a fixed object. The default implementation requires that the value is
/// a GenericFixed object with an identical schema as es.
///
/// The schema for serialization
/// The value to be serialized
/// The encoder for serialization
protected abstract void WriteFixed(FixedSchema es, object value, Encoder encoder);
protected static AvroException TypeMismatch(object obj, string schemaType, string type)
{
return new AvroException(type + " required to write against " + schemaType + " schema but found " + (null == obj ? "null" : obj.GetType().ToString()) );
}
private void error(Schema schema, Object value)
{
throw new AvroTypeException("Not a " + schema + ": " + value);
}
protected abstract bool UnionBranchMatches(Schema sc, object obj);
protected interface EnumAccess
{
void WriteEnum(object value);
}
protected interface ArrayAccess
{
///
/// Checks if the given object is an array. If it is a valid array, this function returns normally. Otherwise,
/// it throws an exception. The default implementation checks if the value is an array.
///
///
void EnsureArrayObject(object value);
///
/// Returns the length of an array. The default implementation requires the object
/// to be an array of objects and returns its length. The defaul implementation
/// gurantees that EnsureArrayObject() has been called on the value before this
/// function is called.
///
/// The object whose array length is required
/// The array length of the given object
long GetArrayLength(object value);
///
/// Returns the element at the given index from the given array object. The default implementation
/// requires that the value is an object array and returns the element in that array. The defaul implementation
/// gurantees that EnsureArrayObject() has been called on the value before this
/// function is called.
///
/// The array object
/// The index to look for
/// The array element at the index
void WriteArrayValues(object array, WriteItem valueWriter, Encoder encoder);
}
protected interface MapAccess
{
///
/// Checks if the given object is a map. If it is a valid map, this function returns normally. Otherwise,
/// it throws an exception. The default implementation checks if the value is an IDictionary.
///
///
void EnsureMapObject(object value);
///
/// Returns the size of the map object. The default implementation gurantees that EnsureMapObject has been
/// successfully called with the given value. The default implementation requires the value
/// to be an IDictionary and returns the number of elements in it.
///
/// The map object whose size is desired
/// The size of the given map object
long GetMapSize(object value);
///
/// Returns the contents of the given map object. The default implementation guarantees that EnsureMapObject
/// has been called with the given value. The defualt implementation of this method requires that
/// the value is an IDictionary and returns its contents.
///
/// The map object whose size is desired
/// The contents of the given map object
void WriteMapValues(object map, WriteItem valueWriter, Encoder encoder);
}
protected class DictionaryMapAccess : MapAccess
{
public void EnsureMapObject( object value )
{
if( value == null || !( value is IDictionary ) ) throw TypeMismatch( value, "map", "IDictionary" );
}
public long GetMapSize( object value )
{
return ( (IDictionary) value ).Count;
}
public void WriteMapValues(object map, WriteItem valueWriter, Encoder encoder)
{
foreach (DictionaryEntry entry in ((IDictionary)map))
{
encoder.StartItem();
encoder.WriteString(entry.Key.ToString());
valueWriter(entry.Value, encoder);
}
}
}
}
}