type) { case self::NULL_TYPE: return is_null($datum); case self::BOOLEAN_TYPE: return is_bool($datum); case self::STRING_TYPE: case self::BYTES_TYPE: return is_string($datum); case self::INT_TYPE: return (is_int($datum) && (self::INT_MIN_VALUE <= $datum) && ($datum <= self::INT_MAX_VALUE)); case self::LONG_TYPE: return (is_int($datum) && (self::LONG_MIN_VALUE <= $datum) && ($datum <= self::LONG_MAX_VALUE)); case self::FLOAT_TYPE: case self::DOUBLE_TYPE: return (is_float($datum) || is_int($datum)); case self::ARRAY_SCHEMA: if (is_array($datum)) { foreach ($datum as $d) if (!self::is_valid_datum($expected_schema->items(), $d)) return false; return true; } return false; case self::MAP_SCHEMA: if (is_array($datum)) { foreach ($datum as $k => $v) if (!is_string($k) || !self::is_valid_datum($expected_schema->values(), $v)) return false; return true; } return false; case self::UNION_SCHEMA: foreach ($expected_schema->schemas() as $schema) if (self::is_valid_datum($schema, $datum)) return true; return false; case self::ENUM_SCHEMA: return in_array($datum, $expected_schema->symbols()); case self::FIXED_SCHEMA: return (is_string($datum) && (strlen($datum) == $expected_schema->size())); case self::RECORD_SCHEMA: case self::ERROR_SCHEMA: case self::REQUEST_SCHEMA: if (is_array($datum)) { foreach ($expected_schema->fields() as $field) if (!self::is_valid_datum($field->type(), $datum[$field->name()])) return false; return true; } return false; default: throw new AvroSchemaParseException( sprintf('%s is not allowed.', $expected_schema)); } } /** * @internal Should only be called from within the constructor of * a class which extends AvroSchema * @param string $type a schema type name */ public function __construct($type) { $this->type = $type; } /** * @param mixed $avro * @param string $default_namespace namespace of enclosing schema * @param AvroNamedSchemata &$schemata * @returns AvroSchema * @uses AvroSchema::real_parse() * @throws AvroSchemaParseException */ protected static function subparse($avro, $default_namespace, &$schemata=null) { try { return self::real_parse($avro, $default_namespace, $schemata); } catch (AvroSchemaParseException $e) { throw $e; } catch (Exception $e) { throw new AvroSchemaParseException( sprintf('Sub-schema is not a valid Avro schema. Bad schema: %s', print_r($avro, true))); } } /** * @returns string schema type name of this schema */ public function type() { return $this->type; } /** * @returns mixed */ public function to_avro() { return array(self::TYPE_ATTR => $this->type); } /** * @returns string the JSON-encoded representation of this Avro schema. */ public function __toString() { return json_encode($this->to_avro()); } /** * @returns mixed value of the attribute with the given attribute name */ public function attribute($attribute) { return $this->$attribute(); } } /** * Avro schema for basic types such as null, int, long, string. * @package Avro */ class AvroPrimitiveSchema extends AvroSchema { /** * @param string $type the primitive schema type name * @throws AvroSchemaParseException if the given $type is not a * primitive schema type name */ public function __construct($type) { if (self::is_primitive_type($type)) return parent::__construct($type); throw new AvroSchemaParseException( sprintf('%s is not a valid primitive type.', $type)); } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); // FIXME: Is this if really necessary? When *wouldn't* this be the case? if (1 == count($avro)) return $this->type; return $avro; } } /** * Avro array schema, consisting of items of a particular * Avro schema type. * @package Avro */ class AvroArraySchema extends AvroSchema { /** * @var AvroName|AvroSchema named schema name or AvroSchema of * array element */ private $items; /** * @var boolean true if the items schema * FIXME: couldn't we derive this from whether or not $this->items * is an AvroName or an AvroSchema? */ private $is_items_schema_from_schemata; /** * @param string|mixed $items AvroNamedSchema name or object form * of decoded JSON schema representation. * @param string $default_namespace namespace of enclosing schema * @param AvroNamedSchemata &$schemata */ public function __construct($items, $default_namespace, &$schemata=null) { parent::__construct(AvroSchema::ARRAY_SCHEMA); $this->is_items_schema_from_schemata = false; $items_schema = null; if (is_string($items) && $items_schema = $schemata->schema_by_name( new AvroName($items, null, $default_namespace))) $this->is_items_schema_from_schemata = true; else $items_schema = AvroSchema::subparse($items, $default_namespace, $schemata); $this->items = $items_schema; } /** * @returns AvroName|AvroSchema named schema name or AvroSchema * of this array schema's elements. */ public function items() { return $this->items; } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); $avro[AvroSchema::ITEMS_ATTR] = $this->is_items_schema_from_schemata ? $this->items->qualified_name() : $this->items->to_avro(); return $avro; } } /** * Avro map schema consisting of named values of defined * Avro Schema types. * @package Avro */ class AvroMapSchema extends AvroSchema { /** * @var string|AvroSchema named schema name or AvroSchema * of map schema values. */ private $values; /** * @var boolean true if the named schema * XXX Couldn't we derive this based on whether or not * $this->values is a string? */ private $is_values_schema_from_schemata; /** * @param string|AvroSchema $values * @param string $default_namespace namespace of enclosing schema * @param AvroNamedSchemata &$schemata */ public function __construct($values, $default_namespace, &$schemata=null) { parent::__construct(AvroSchema::MAP_SCHEMA); $this->is_values_schema_from_schemata = false; $values_schema = null; if (is_string($values) && $values_schema = $schemata->schema_by_name( new AvroName($values, null, $default_namespace))) $this->is_values_schema_from_schemata = true; else $values_schema = AvroSchema::subparse($values, $default_namespace, $schemata); $this->values = $values_schema; } /** * @returns XXX|AvroSchema */ public function values() { return $this->values; } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); $avro[AvroSchema::VALUES_ATTR] = $this->is_values_schema_from_schemata ? $this->values->qualified_name() : $this->values->to_avro(); return $avro; } } /** * Union of Avro schemas, of which values can be of any of the schema in * the union. * @package Avro */ class AvroUnionSchema extends AvroSchema { /** * @var AvroSchema[] list of schemas of this union */ private $schemas; /** * @var int[] list of indices of named schemas which * are defined in $schemata */ private $schema_from_schemata_indices; /** * @param AvroSchema[] $schemas list of schemas in the union * @param string $default_namespace namespace of enclosing schema * @param AvroNamedSchemata &$schemata */ public function __construct($schemas, $default_namespace, &$schemata=null) { parent::__construct(AvroSchema::UNION_SCHEMA); $this->schema_from_schemata_indices = array(); $schema_types = array(); foreach ($schemas as $index => $schema) { $is_schema_from_schemata = false; $new_schema = null; if (is_string($schema) && ($new_schema = $schemata->schema_by_name( new AvroName($schema, null, $default_namespace)))) $is_schema_from_schemata = true; else $new_schema = self::subparse($schema, $default_namespace, $schemata); $schema_type = $new_schema->type; if (self::is_valid_type($schema_type) && !self::is_named_type($schema_type) && in_array($schema_type, $schema_types)) throw new AvroSchemaParseException( sprintf('"%s" is already in union', $schema_type)); elseif (AvroSchema::UNION_SCHEMA == $schema_type) throw new AvroSchemaParseException('Unions cannot contain other unions'); else { $schema_types []= $schema_type; $this->schemas []= $new_schema; if ($is_schema_from_schemata) $this->schema_from_schemata_indices []= $index; } } } /** * @returns AvroSchema[] */ public function schemas() { return $this->schemas; } /** * @returns AvroSchema the particular schema from the union for * the given (zero-based) index. * @throws AvroSchemaParseException if the index is invalid for this schema. */ public function schema_by_index($index) { if (count($this->schemas) > $index) return $this->schemas[$index]; throw new AvroSchemaParseException('Invalid union schema index'); } /** * @returns mixed */ public function to_avro() { $avro = array(); foreach ($this->schemas as $index => $schema) $avro []= (in_array($index, $this->schema_from_schemata_indices)) ? $schema->qualified_name() : $schema->to_avro(); return $avro; } } /** * Parent class of named Avro schema * @package Avro * @todo Refactor AvroNamedSchema to use an AvroName instance * to store name information. */ class AvroNamedSchema extends AvroSchema { /** * @var AvroName $name */ private $name; /** * @var string documentation string */ private $doc; /** * @param string $type * @param AvroName $name * @param string $doc documentation string * @param AvroNamedSchemata &$schemata * @throws AvroSchemaParseException */ public function __construct($type, $name, $doc=null, &$schemata=null) { parent::__construct($type); $this->name = $name; if ($doc && !is_string($doc)) throw new AvroSchemaParseException('Schema doc attribute must be a string'); $this->doc = $doc; $schemata = $schemata->clone_with_new_schema($this); } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); list($name, $namespace) = AvroName::extract_namespace($this->qualified_name()); $avro[AvroSchema::NAME_ATTR] = $name; if ($namespace) $avro[AvroSchema::NAMESPACE_ATTR] = $namespace; if (!is_null($this->doc)) $avro[AvroSchema::DOC_ATTR] = $this->doc; return $avro; } /** * @returns string */ public function fullname() { return $this->name->fullname(); } public function qualified_name() { return $this->name->qualified_name(); } } /** * @package Avro */ class AvroName { /** * @var string character used to separate names comprising the fullname */ const NAME_SEPARATOR = '.'; /** * @var string regular expression to validate name values */ const NAME_REGEXP = '/^[A-Za-z_][A-Za-z0-9_]*$/'; /** * @returns string[] array($name, $namespace) */ public static function extract_namespace($name, $namespace=null) { $parts = explode(self::NAME_SEPARATOR, $name); if (count($parts) > 1) { $name = array_pop($parts); $namespace = join(self::NAME_SEPARATOR, $parts); } return array($name, $namespace); } /** * @returns boolean true if the given name is well-formed * (is a non-null, non-empty string) and false otherwise */ public static function is_well_formed_name($name) { return (is_string($name) && !empty($name) && preg_match(self::NAME_REGEXP, $name)); } /** * @param string $namespace * @returns boolean true if namespace is composed of valid names * @throws AvroSchemaParseException if any of the namespace components * are invalid. */ private static function check_namespace_names($namespace) { foreach (explode(self::NAME_SEPARATOR, $namespace) as $n) { if (empty($n) || (0 == preg_match(self::NAME_REGEXP, $n))) throw new AvroSchemaParseException(sprintf('Invalid name "%s"', $n)); } return true; } /** * @param string $name * @param string $namespace * @returns string * @throws AvroSchemaParseException if any of the names are not valid. */ private static function parse_fullname($name, $namespace) { if (!is_string($namespace) || empty($namespace)) throw new AvroSchemaParseException('Namespace must be a non-empty string.'); self::check_namespace_names($namespace); return $namespace . '.' . $name; } /** * @var string valid names are matched by self::NAME_REGEXP */ private $name; /** * @var string */ private $namespace; /** * @var string */ private $fullname; /** * @var string Name qualified as necessary given its default namespace. */ private $qualified_name; /** * @param string $name * @param string $namespace * @param string $default_namespace */ public function __construct($name, $namespace, $default_namespace) { if (!is_string($name) || empty($name)) throw new AvroSchemaParseException('Name must be a non-empty string.'); if (strpos($name, self::NAME_SEPARATOR) && self::check_namespace_names($name)) $this->fullname = $name; elseif (0 == preg_match(self::NAME_REGEXP, $name)) throw new AvroSchemaParseException(sprintf('Invalid name "%s"', $name)); elseif (!is_null($namespace)) $this->fullname = self::parse_fullname($name, $namespace); elseif (!is_null($default_namespace)) $this->fullname = self::parse_fullname($name, $default_namespace); else $this->fullname = $name; list($this->name, $this->namespace) = self::extract_namespace($this->fullname); $this->qualified_name = (is_null($this->namespace) || $this->namespace == $default_namespace) ? $this->name : $this->fullname; } /** * @returns array array($name, $namespace) */ public function name_and_namespace() { return array($this->name, $this->namespace); } /** * @returns string */ public function fullname() { return $this->fullname; } /** * @returns string fullname * @uses $this->fullname() */ public function __toString() { return $this->fullname(); } /** * @returns string name qualified for its context */ public function qualified_name() { return $this->qualified_name; } } /** * Keeps track of AvroNamedSchema which have been observed so far, * as well as the default namespace. * * @package Avro */ class AvroNamedSchemata { /** * @var AvroNamedSchema[] */ private $schemata; /** * @param AvroNamedSchemata[] */ public function __construct($schemata=array()) { $this->schemata = $schemata; } /** * @param string $fullname * @returns boolean true if there exists a schema with the given name * and false otherwise. */ public function has_name($fullname) { return array_key_exists($fullname, $this->schemata); } /** * @param string $fullname * @returns AvroSchema|null the schema which has the given name, * or null if there is no schema with the given name. */ public function schema($fullname) { if (isset($this->schemata[$fullname])) return $this->schemata[$fullname]; return null; } /** * @param AvroName $name * @returns AvroSchema|null */ public function schema_by_name($name) { return $this->schema($name->fullname()); } /** * Creates a new AvroNamedSchemata instance of this schemata instance * with the given $schema appended. * @param AvroNamedSchema schema to add to this existing schemata * @returns AvroNamedSchemata */ public function clone_with_new_schema($schema) { $name = $schema->fullname(); if (AvroSchema::is_valid_type($name)) throw new AvroSchemaParseException( sprintf('Name "%s" is a reserved type name', $name)); else if ($this->has_name($name)) throw new AvroSchemaParseException( sprintf('Name "%s" is already in use', $name)); $schemata = new AvroNamedSchemata($this->schemata); $schemata->schemata[$name] = $schema; return $schemata; } } /** * @package Avro */ class AvroEnumSchema extends AvroNamedSchema { /** * @var string[] array of symbols */ private $symbols; /** * @param AvroName $name * @param string $doc * @param string[] $symbols * @param AvroNamedSchemata &$schemata * @throws AvroSchemaParseException */ public function __construct($name, $doc, $symbols, &$schemata=null) { if (!AvroUtil::is_list($symbols)) throw new AvroSchemaParseException('Enum Schema symbols are not a list'); if (count(array_unique($symbols)) > count($symbols)) throw new AvroSchemaParseException( sprintf('Duplicate symbols: %s', $symbols)); foreach ($symbols as $symbol) if (!is_string($symbol) || empty($symbol)) throw new AvroSchemaParseException( sprintf('Enum schema symbol must be a string %', print_r($symbol, true))); parent::__construct(AvroSchema::ENUM_SCHEMA, $name, $doc, $schemata); $this->symbols = $symbols; } /** * @returns string[] this enum schema's symbols */ public function symbols() { return $this->symbols; } /** * @param string $symbol * @returns boolean true if the given symbol exists in this * enum schema and false otherwise */ public function has_symbol($symbol) { return in_array($symbol, $this->symbols); } /** * @param int $index * @returns string enum schema symbol with the given (zero-based) index */ public function symbol_by_index($index) { if (array_key_exists($index, $this->symbols)) return $this->symbols[$index]; throw new AvroException(sprintf('Invalid symbol index %d', $index)); } /** * @param string $symbol * @returns int the index of the given $symbol in the enum schema */ public function symbol_index($symbol) { $idx = array_search($symbol, $this->symbols, true); if (false !== $idx) return $idx; throw new AvroException(sprintf("Invalid symbol value '%s'", $symbol)); } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); $avro[AvroSchema::SYMBOLS_ATTR] = $this->symbols; return $avro; } } /** * AvroNamedSchema with fixed-length data values * @package Avro */ class AvroFixedSchema extends AvroNamedSchema { /** * @var int byte count of this fixed schema data value */ private $size; /** * @param AvroName $name * @param string $doc Set to null, as fixed schemas don't have doc strings * @param int $size byte count of this fixed schema data value * @param AvroNamedSchemata &$schemata */ public function __construct($name, $doc, $size, &$schemata=null) { $doc = null; // Fixed schemas don't have doc strings. if (!is_integer($size)) throw new AvroSchemaParseException( 'Fixed Schema requires a valid integer for "size" attribute'); parent::__construct(AvroSchema::FIXED_SCHEMA, $name, $doc, $schemata); return $this->size = $size; } /** * @returns int byte count of this fixed schema data value */ public function size() { return $this->size; } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); $avro[AvroSchema::SIZE_ATTR] = $this->size; return $avro; } } /** * @package Avro */ class AvroRecordSchema extends AvroNamedSchema { /** * @param mixed $field_data * @param string $default_namespace namespace of enclosing schema * @param AvroNamedSchemata &$schemata * @returns AvroField[] * @throws AvroSchemaParseException */ private static function parse_fields($field_data, $default_namespace, &$schemata) { $fields = array(); $field_names = array(); foreach ($field_data as $index => $field) { $name = AvroUtil::array_value($field, AvroField::FIELD_NAME_ATTR); $type = AvroUtil::array_value($field, AvroSchema::TYPE_ATTR); $order = AvroUtil::array_value($field, AvroField::ORDER_ATTR); $default = null; $has_default = false; if (array_key_exists(AvroField::DEFAULT_ATTR, $field)) { $default = $field[AvroField::DEFAULT_ATTR]; $has_default = true; } if (in_array($name, $field_names)) throw new AvroSchemaParseException( sprintf("Field name %s is already in use", $name)); $is_schema_from_schemata = false; $field_schema = null; if (is_string($type) && $field_schema = $schemata->schema_by_name( new AvroName($type, null, $default_namespace))) $is_schema_from_schemata = true; else $field_schema = self::subparse($type, $default_namespace, $schemata); $new_field = new AvroField($name, $field_schema, $is_schema_from_schemata, $has_default, $default, $order); $field_names []= $name; $fields []= $new_field; } return $fields; } /** * @var AvroSchema[] array of AvroNamedSchema field definitions of * this AvroRecordSchema */ private $fields; /** * @var array map of field names to field objects. * @internal Not called directly. Memoization of AvroRecordSchema->fields_hash() */ private $fields_hash; /** * @param string $name * @param string $namespace * @param string $doc * @param array $fields * @param AvroNamedSchemata &$schemata * @param string $schema_type schema type name * @throws AvroSchemaParseException */ public function __construct($name, $doc, $fields, &$schemata=null, $schema_type=AvroSchema::RECORD_SCHEMA) { if (is_null($fields)) throw new AvroSchemaParseException( 'Record schema requires a non-empty fields attribute'); if (AvroSchema::REQUEST_SCHEMA == $schema_type) $this->type = $schema_type; else parent::__construct($schema_type, $name, $doc, $schemata); list($x, $namespace) = $name->name_and_namespace(); $this->fields = self::parse_fields($fields, $namespace, $schemata); } /** * @returns mixed */ public function to_avro() { $avro = parent::to_avro(); $fields_avro = array(); foreach ($this->fields as $field) $fields_avro [] = $field->to_avro(); if (AvroSchema::REQUEST_SCHEMA == $this->type) return $fields_avro; $avro[AvroSchema::FIELDS_ATTR] = $fields_avro; return $avro; } /** * @returns array the schema definitions of the fields of this AvroRecordSchema */ public function fields() { return $this->fields; } /** * @returns array a hash table of the fields of this AvroRecordSchema fields * keyed by each field's name */ public function fields_hash() { if (is_null($this->fields_hash)) { $hash = array(); foreach ($this->fields as $field) $hash[$field->name()] = $field; $this->fields_hash = $hash; } return $this->fields_hash; } } /** * Field of an {@link AvroRecordSchema} * @package Avro */ class AvroField extends AvroSchema { /** * @var string fields name attribute name */ const FIELD_NAME_ATTR = 'name'; /** * @var string */ const DEFAULT_ATTR = 'default'; /** * @var string */ const ORDER_ATTR = 'order'; /** * @var string */ const ASC_SORT_ORDER = 'ascending'; /** * @var string */ const DESC_SORT_ORDER = 'descending'; /** * @var string */ const IGNORE_SORT_ORDER = 'ignore'; /** * @var array list of valid field sort order values */ private static $valid_field_sort_orders = array(self::ASC_SORT_ORDER, self::DESC_SORT_ORDER, self::IGNORE_SORT_ORDER); /** * @param string $order * @returns boolean */ private static function is_valid_field_sort_order($order) { return in_array($order, self::$valid_field_sort_orders); } /** * @param string $order * @throws AvroSchemaParseException if $order is not a valid * field order value. */ private static function check_order_value($order) { if (!is_null($order) && !self::is_valid_field_sort_order($order)) throw new AvroSchemaParseException( sprintf('Invalid field sort order %s', $order)); } /** * @var string */ private $name; /** * @var boolean whether or no there is a default value */ private $has_default; /** * @var string field default value */ private $default; /** * @var string sort order of this field */ private $order; /** * @var boolean whether or not the AvroNamedSchema of this field is * defined in the AvroNamedSchemata instance */ private $is_type_from_schemata; /** * @param string $type * @param string $name * @param AvroSchema $schema * @param boolean $is_type_from_schemata * @param string $default * @param string $order * @todo Check validity of $default value * @todo Check validity of $order value */ public function __construct($name, $schema, $is_type_from_schemata, $has_default, $default, $order=null) { if (!AvroName::is_well_formed_name($name)) throw new AvroSchemaParseException('Field requires a "name" attribute'); $this->type = $schema; $this->is_type_from_schemata = $is_type_from_schemata; $this->name = $name; $this->has_default = $has_default; if ($this->has_default) $this->default = $default; $this->check_order_value($order); $this->order = $order; } /** * @returns mixed */ public function to_avro() { $avro = array(AvroField::FIELD_NAME_ATTR => $this->name); $avro[AvroSchema::TYPE_ATTR] = ($this->is_type_from_schemata) ? $this->type->qualified_name() : $this->type->to_avro(); if (isset($this->default)) $avro[AvroField::DEFAULT_ATTR] = $this->default; if ($this->order) $avro[AvroField::ORDER_ATTR] = $this->order; return $avro; } /** * @returns string the name of this field */ public function name() { return $this->name; } /** * @returns mixed the default value of this field */ public function default_value() { return $this->default; } /** * @returns boolean true if the field has a default and false otherwise */ public function has_default_value() { return $this->has_default; } }