The current version of Avro is 1.5.1. The current version of libavro is 21:0:0. This document was created 2011-04-29.
1. Introduction to Avro
Avro is a data serialization system.
Avro provides:
-
Rich data structures.
-
A compact, fast, binary data format.
-
A container file, to store persistent data.
-
Remote procedure call (RPC).
This document will focus on the C implementation of Avro. To learn more about Avro in general, visit the Avro website.
2. Introduction to Avro C
___ ______ / |_ ___________ / ____/ / /| | | / / ___/ __ \ / / / ___ | |/ / / / /_/ / / /___ /_/ |_|___/_/ \____/ \____/
A C program is like a fast dance on a newly waxed dance floor by people carrying razors.
— Waldi Ravens
The C implementation has been tested on MacOSX and Linux but, over time, the number of support OSes should grow. Please let us know if you’re using Avro C on other systems. There are no dependencies on external libraries. We embedded Jansson into Avro C for parsing JSON into schema structures.
The C implementation supports:
-
binary encoding/decoding of all primitive and complex data types
-
storage to an Avro Object Container File
-
schema resolution, promotion and projection
-
validating and non-validating mode for writing Avro data
The C implementation is lacking:
-
RPC
To learn about the API, take a look at the examples and reference files later in this document.
We’re always looking for contributions so, if you’re a C hacker, please feel free to submit patches to the project.
3. Reference Counting
Avro C does reference counting for all schema and data objects. When the number of references drops to zero, the memory is freed.
For example, to create and free a string, you would use:
avro_datum_t string = avro_string("This is my string"); ... avro_datum_decref(string);
Things get a little more complicated when you consider more elaborate schema and data structures.
For example, let’s say that you create a record with a single string field:
avro_datum_t example = avro_record("Example"); avro_datum_t solo_field = avro_string("Example field value"); avro_record_set(example, "solo", solo_field); ... avro_datum_decref(example);
In this example, the solo_field datum would not be freed since it has two references: the original reference and a reference inside the Example record. The avro_datum_decref(example) call drops the number of reference to one. If you are finished with the solo_field schema, then you need to avro_schema_decref(solo_field) to completely dereference the solo_field datum and free it.
4. Wrap It and Give It
You’ll notice that some datatypes can be "wrapped" and "given". This allows C programmers the freedom to decide who is responsible for the memory. Let’s take strings for example.
To create a string datum, you have three different methods:
avro_datum_t avro_string(const char *str); avro_datum_t avro_wrapstring(const char *str); avro_datum_t avro_givestring(const char *str);
If you use, avro_string then Avro C will make a copy of your string and free it when the datum is dereferenced. In some cases, especially when dealing with large amounts of data, you want to avoid this memory copy. That’s where avro_wrapstring and avro_givestring can help.
If you use, avro_wrapstring then Avro C will do no memory management at all. It will just save a pointer to your data and it’s your responsibility to free the string.
Warning
|
When using avro_wrapstring, do not free the string before you dereference the string datum with avro_datum_decref(). |
Lastly, if you use avro_givestring then Avro C will free the string later when the datum is dereferenced. In a sense, you are "giving" responsibility for freeing the string to Avro C.
Warning
|
Don’t "give" Avro C a string that you haven’t allocated from the heap with e.g. malloc or strdup. For example, don’t do this: avro_datum_t bad_idea = avro_givestring("This isn't allocated on the heap"); |
5. Schema Validation
If you want to write a datum, you would use the following function
int avro_write_data(avro_writer_t writer, avro_schema_t writers_schema, avro_datum_t datum);
If you pass in a writers_schema, then you datum will be validated before it is sent to the writer. This check ensures that your data has the correct format. If you are certain your datum is correct, you can pass a NULL value for writers_schema and Avro C will not validate before writing.
Note
|
Data written to an Avro File Object Container is always validated. |
6. Examples
I’m not even supposed to be here today!
Imagine you’re a free-lance hacker in Leonardo, New Jersey and you’ve been approached by the owner of the local Quick Stop Convenience store. He wants you to create a contact database case he needs to call employees to work on their day off.
You might build a simple contact system using Avro C like the following…
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ #include <avro.h> #include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> avro_schema_t person_schema; int64_t id = 0; /* A simple schema for our tutorial */ #define PERSON_SCHEMA \ "{\"type\":\"record\",\ \"name\":\"Person\",\ \"fields\":[\ {\"name\": \"ID\", \"type\": \"long\"},\ {\"name\": \"First\", \"type\": \"string\"},\ {\"name\": \"Last\", \"type\": \"string\"},\ {\"name\": \"Phone\", \"type\": \"string\"},\ {\"name\": \"Age\", \"type\": \"int\"}]}" /* Parse schema into a schema data structure */ void init_schema(void) { avro_schema_error_t error; if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &person_schema, &error)) { fprintf(stderr, "Unable to parse person schema\n"); exit(EXIT_FAILURE); } } /* Create a datum to match the person schema and save it */ void add_person(avro_file_writer_t db, const char *first, const char *last, const char *phone, int32_t age) { avro_datum_t person = avro_record(person_schema); avro_datum_t id_datum = avro_int64(++id); avro_datum_t first_datum = avro_string(first); avro_datum_t last_datum = avro_string(last); avro_datum_t age_datum = avro_int32(age); avro_datum_t phone_datum = avro_string(phone); if (avro_record_set(person, "ID", id_datum) || avro_record_set(person, "First", first_datum) || avro_record_set(person, "Last", last_datum) || avro_record_set(person, "Age", age_datum) || avro_record_set(person, "Phone", phone_datum)) { fprintf(stderr, "Unable to create Person datum structure"); exit(EXIT_FAILURE); } if (avro_file_writer_append(db, person)) { fprintf(stderr, "Unable to write Person datum to memory buffer"); exit(EXIT_FAILURE); } /* Decrement all our references to prevent memory from leaking */ avro_datum_decref(id_datum); avro_datum_decref(first_datum); avro_datum_decref(last_datum); avro_datum_decref(age_datum); avro_datum_decref(phone_datum); avro_datum_decref(person); fprintf(stdout, "Successfully added %s, %s id=%"PRId64"\n", last, first, id); } int print_person(avro_file_reader_t db, avro_schema_t reader_schema) { int rval; avro_datum_t person; rval = avro_file_reader_read(db, reader_schema, &person); if (rval == 0) { int64_t i64; int32_t i32; char *p; avro_datum_t id_datum, first_datum, last_datum, phone_datum, age_datum; if (avro_record_get(person, "ID", &id_datum) == 0) { avro_int64_get(id_datum, &i64); fprintf(stdout, "%"PRId64" | ", i64); } if (avro_record_get(person, "First", &first_datum) == 0) { avro_string_get(first_datum, &p); fprintf(stdout, "%15s | ", p); } if (avro_record_get(person, "Last", &last_datum) == 0) { avro_string_get(last_datum, &p); fprintf(stdout, "%15s | ", p); } if (avro_record_get(person, "Phone", &phone_datum) == 0) { avro_string_get(phone_datum, &p); fprintf(stdout, "%15s | ", p); } if (avro_record_get(person, "Age", &age_datum) == 0) { avro_int32_get(age_datum, &i32); fprintf(stdout, "%d", i32); } fprintf(stdout, "\n"); /* We no longer need this memory */ avro_datum_decref(person); } return rval; } int main(void) { int rval; avro_file_reader_t dbreader; avro_file_writer_t db; avro_schema_t projection_schema, first_name_schema, phone_schema; int64_t i; const char *dbname = "quickstop.db"; /* Initialize the schema structure from JSON */ init_schema(); /* Delete the database if it exists */ unlink(dbname); /* Create a new database */ rval = avro_file_writer_create(dbname, person_schema, &db); if (rval) { fprintf(stderr, "There was an error creating %s\n", dbname); exit(EXIT_FAILURE); } /* Add people to the database */ add_person(db, "Dante", "Hicks", "(555) 123-4567", 32); add_person(db, "Randal", "Graves", "(555) 123-5678", 30); add_person(db, "Veronica", "Loughran", "(555) 123-0987", 28); add_person(db, "Caitlin", "Bree", "(555) 123-2323", 27); add_person(db, "Bob", "Silent", "(555) 123-6422", 29); add_person(db, "Jay", "???", "(555) 123-9182", 26); avro_file_writer_close(db); fprintf(stdout, "\nNow let's read all the records back out\n"); /* Read all the records and print them */ avro_file_reader(dbname, &dbreader); for (i = 0; i < id; i++) { if (print_person(dbreader, NULL)) { fprintf(stderr, "Error printing person\n"); exit(EXIT_FAILURE); } } avro_file_reader_close(dbreader); /* You can also use projection, to only decode only the data you are interested in. This is particularly useful when you have huge data sets and you'll only interest in particular fields e.g. your contacts First name and phone number */ projection_schema = avro_schema_record("Person", NULL); first_name_schema = avro_schema_string(); phone_schema = avro_schema_string(); avro_schema_record_field_append(projection_schema, "First", first_name_schema); avro_schema_record_field_append(projection_schema, "Phone", phone_schema); /* Read only the record you're interested in */ fprintf(stdout, "\n\nUse projection to print only the First name and phone numbers\n"); avro_file_reader(dbname, &dbreader); for (i = 0; i < id; i++) { if (print_person(dbreader, projection_schema)) { fprintf(stderr, "Error printing person\n"); exit(EXIT_FAILURE); } } avro_file_reader_close(dbreader); avro_schema_decref(first_name_schema); avro_schema_decref(phone_schema); avro_schema_decref(projection_schema); /* We don't need this schema anymore */ avro_schema_decref(person_schema); return 0; }
When you compile and run this program, you should get the following output
Successfully added Hicks, Dante id=1 Successfully added Graves, Randal id=2 Successfully added Loughran, Veronica id=3 Successfully added Bree, Caitlin id=4 Successfully added Silent, Bob id=5 Successfully added ???, Jay id=6 Avro is compact. Here is the data for all 6 people. | 02 0A 44 61 6E 74 65 0A | 48 69 63 6B 73 1C 28 35 | ..Dante.Hicks.(5 | 35 35 29 20 31 32 33 2D | 34 35 36 37 40 04 0C 52 | 55) 123-4567@..R | 61 6E 64 61 6C 0C 47 72 | 61 76 65 73 1C 28 35 35 | andal.Graves.(55 | 35 29 20 31 32 33 2D 35 | 36 37 38 3C 06 10 56 65 | 5) 123-5678<..Ve | 72 6F 6E 69 63 61 10 4C | 6F 75 67 68 72 61 6E 1C | ronica.Loughran. | 28 35 35 35 29 20 31 32 | 33 2D 30 39 38 37 38 08 | (555) 123-09878. | 0E 43 61 69 74 6C 69 6E | 08 42 72 65 65 1C 28 35 | .Caitlin.Bree.(5 | 35 35 29 20 31 32 33 2D | 32 33 32 33 36 0A 06 42 | 55) 123-23236..B | 6F 62 0C 53 69 6C 65 6E | 74 1C 28 35 35 35 29 20 | ob.Silent.(555) | 31 32 33 2D 36 34 32 32 | 3A 0C 06 4A 61 79 06 3F | 123-6422:..Jay.? | 3F 3F 1C 28 35 35 35 29 | 20 31 32 33 2D 39 31 38 | ??.(555) 123-918 | 32 34 .. .. .. .. .. .. | .. .. .. .. .. .. .. .. | 24.............. Now let's read all the records back out 1 | Dante | Hicks | (555) 123-4567 | 32 2 | Randal | Graves | (555) 123-5678 | 30 3 | Veronica | Loughran | (555) 123-0987 | 28 4 | Caitlin | Bree | (555) 123-2323 | 27 5 | Bob | Silent | (555) 123-6422 | 29 6 | Jay | ??? | (555) 123-9182 | 26 Use projection to print only the First name and phone numbers Dante | (555) 123-4567 | Randal | (555) 123-5678 | Veronica | (555) 123-0987 | Caitlin | (555) 123-2323 | Bob | (555) 123-6422 | Jay | (555) 123-9182 |
The Quick Stop owner was so pleased, he asked you to create a movie database for his RST Video store.
7. Reference files
7.1. avro.h
The avro.h header file contains the complete public API for Avro C. The documentation is rather sparse right now but we’ll be adding more information soon.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ #ifndef AVRO_H #define AVRO_H #ifdef __cplusplus extern "C" { #define CLOSE_EXTERN } #else #define CLOSE_EXTERN #endif #include <stdio.h> #include <stdint.h> /* * Allocation interface. You can provide a custom allocator for the * library, should you wish. The allocator is provided as a single * generic function, which can emulate the standard malloc, realloc, and * free functions. The design of this allocation interface is inspired * by the implementation of the Lua interpreter. * * The ptr parameter will be the location of any existing memory * buffer. The osize parameter will be the size of this existing * buffer. If ptr is NULL, then osize will be 0. The nsize parameter * will be the size of the new buffer, or 0 if the new buffer should be * freed. * * If nsize is 0, then the allocation function must return NULL. If * nsize is not 0, then it should return NULL if the allocation fails. */ typedef void * (*avro_allocator_t)(void *user_data, void *ptr, size_t osize, size_t nsize); void avro_set_allocator(avro_allocator_t alloc, void *user_data); /* * Returns a textual description of the last error condition returned by * an Avro function. */ const char *avro_strerror(void); enum avro_type_t { AVRO_STRING, AVRO_BYTES, AVRO_INT32, AVRO_INT64, AVRO_FLOAT, AVRO_DOUBLE, AVRO_BOOLEAN, AVRO_NULL, AVRO_RECORD, AVRO_ENUM, AVRO_FIXED, AVRO_MAP, AVRO_ARRAY, AVRO_UNION, AVRO_LINK }; typedef enum avro_type_t avro_type_t; enum avro_class_t { AVRO_SCHEMA, AVRO_DATUM }; typedef enum avro_class_t avro_class_t; struct avro_obj_t { avro_type_t type; avro_class_t class_type; unsigned long refcount; }; #define avro_classof(obj) ((obj)->class_type) #define is_avro_schema(obj) (obj && avro_classof(obj) == AVRO_SCHEMA) #define is_avro_datum(obj) (obj && avro_classof(obj) == AVRO_DATUM) #define avro_typeof(obj) ((obj)->type) #define is_avro_string(obj) (obj && avro_typeof(obj) == AVRO_STRING) #define is_avro_bytes(obj) (obj && avro_typeof(obj) == AVRO_BYTES) #define is_avro_int32(obj) (obj && avro_typeof(obj) == AVRO_INT32) #define is_avro_int64(obj) (obj && avro_typeof(obj) == AVRO_INT64) #define is_avro_float(obj) (obj && avro_typeof(obj) == AVRO_FLOAT) #define is_avro_double(obj) (obj && avro_typeof(obj) == AVRO_DOUBLE) #define is_avro_boolean(obj) (obj && avro_typeof(obj) == AVRO_BOOLEAN) #define is_avro_null(obj) (obj && avro_typeof(obj) == AVRO_NULL) #define is_avro_primitive(obj)(is_avro_string(obj) \ ||is_avro_bytes(obj) \ ||is_avro_int32(obj) \ ||is_avro_int64(obj) \ ||is_avro_float(obj) \ ||is_avro_double(obj) \ ||is_avro_boolean(obj) \ ||is_avro_null(obj)) #define is_avro_record(obj) (obj && avro_typeof(obj) == AVRO_RECORD) #define is_avro_enum(obj) (obj && avro_typeof(obj) == AVRO_ENUM) #define is_avro_fixed(obj) (obj && avro_typeof(obj) == AVRO_FIXED) #define is_avro_named_type(obj)(is_avro_record(obj) \ ||is_avro_enum(obj) \ ||is_avro_fixed(obj)) #define is_avro_map(obj) (obj && avro_typeof(obj) == AVRO_MAP) #define is_avro_array(obj) (obj && avro_typeof(obj) == AVRO_ARRAY) #define is_avro_union(obj) (obj && avro_typeof(obj) == AVRO_UNION) #define is_avro_complex_type(obj) (!(is_avro_primitive(obj)) #define is_avro_link(obj) (obj && avro_typeof(obj) == AVRO_LINK) typedef struct avro_reader_t_ *avro_reader_t; typedef struct avro_writer_t_ *avro_writer_t; /* * schema */ typedef struct avro_obj_t *avro_schema_t; avro_schema_t avro_schema_string(void); avro_schema_t avro_schema_bytes(void); avro_schema_t avro_schema_int(void); avro_schema_t avro_schema_long(void); avro_schema_t avro_schema_float(void); avro_schema_t avro_schema_double(void); avro_schema_t avro_schema_boolean(void); avro_schema_t avro_schema_null(void); avro_schema_t avro_schema_record(const char *name, const char *space); avro_schema_t avro_schema_record_field_get(const avro_schema_t record, const char *field_name); const char *avro_schema_record_field_name(const avro_schema_t schema, int index); int avro_schema_record_field_get_index(const avro_schema_t schema, const char *field_name); avro_schema_t avro_schema_record_field_get_by_index (const avro_schema_t record, int index); int avro_schema_record_field_append(const avro_schema_t record, const char *field_name, const avro_schema_t type); size_t avro_schema_record_size(const avro_schema_t record); avro_schema_t avro_schema_enum(const char *name); const char *avro_schema_enum_get(const avro_schema_t enump, int index); int avro_schema_enum_get_by_name(const avro_schema_t enump, const char *symbol_name); int avro_schema_enum_symbol_append(const avro_schema_t enump, const char *symbol); avro_schema_t avro_schema_fixed(const char *name, const int64_t len); int64_t avro_schema_fixed_size(const avro_schema_t fixed); avro_schema_t avro_schema_map(const avro_schema_t values); avro_schema_t avro_schema_map_values(avro_schema_t map); avro_schema_t avro_schema_array(const avro_schema_t items); avro_schema_t avro_schema_array_items(avro_schema_t array); avro_schema_t avro_schema_union(void); size_t avro_schema_union_size(const avro_schema_t union_schema); int avro_schema_union_append(const avro_schema_t union_schema, const avro_schema_t schema); avro_schema_t avro_schema_union_branch(avro_schema_t union_schema, int branch_index); avro_schema_t avro_schema_union_branch_by_name (avro_schema_t union_schema, int *branch_index, const char *name); avro_schema_t avro_schema_link(avro_schema_t schema); avro_schema_t avro_schema_link_target(avro_schema_t schema); typedef struct avro_schema_error_t_ *avro_schema_error_t; int avro_schema_from_json(const char *jsontext, const int32_t len, avro_schema_t * schema, avro_schema_error_t * error); int avro_schema_to_json(const avro_schema_t schema, avro_writer_t out); int avro_schema_to_specific(avro_schema_t schema, const char *prefix); avro_schema_t avro_schema_get_subschema(const avro_schema_t schema, const char *name); const char *avro_schema_name(const avro_schema_t schema); const char *avro_schema_type_name(const avro_schema_t schema); avro_schema_t avro_schema_copy(avro_schema_t schema); int avro_schema_equal(avro_schema_t a, avro_schema_t b); avro_schema_t avro_schema_incref(avro_schema_t schema); void avro_schema_decref(avro_schema_t schema); /* * io */ avro_reader_t avro_reader_file(FILE * fp); avro_writer_t avro_writer_file(FILE * fp); avro_reader_t avro_reader_memory(const char *buf, int64_t len); avro_writer_t avro_writer_memory(const char *buf, int64_t len); int avro_read(avro_reader_t reader, void *buf, int64_t len); int avro_skip(avro_reader_t reader, int64_t len); int avro_write(avro_writer_t writer, void *buf, int64_t len); void avro_writer_reset(avro_writer_t writer); int64_t avro_writer_tell(avro_writer_t writer); void avro_writer_flush(avro_writer_t writer); void avro_writer_dump(avro_writer_t writer, FILE * fp); void avro_reader_dump(avro_reader_t reader, FILE * fp); void avro_reader_free(avro_reader_t reader); void avro_writer_free(avro_writer_t writer); /* * datum */ /** * A function used to free a bytes, string, or fixed buffer once it is * no longer needed by the datum that wraps it. */ typedef void (*avro_free_func_t)(void *ptr, size_t sz); /** * An avro_free_func_t that frees the buffer using the custom allocator * provided to avro_set_allocator. */ void avro_alloc_free(void *ptr, size_t sz); /* * Datum constructors. Each datum stores a reference to the schema that * the datum is an instance of. The primitive datum constructors don't * need to take in an explicit avro_schema_t parameter, since there's * only one schema that they could be an instance of. The complex * constructors do need an explicit schema parameter. */ typedef struct avro_obj_t *avro_datum_t; avro_datum_t avro_string(const char *str); avro_datum_t avro_givestring(const char *str, avro_free_func_t free); avro_datum_t avro_bytes(const char *buf, int64_t len); avro_datum_t avro_givebytes(const char *buf, int64_t len, avro_free_func_t free); avro_datum_t avro_int32(int32_t i); avro_datum_t avro_int64(int64_t l); avro_datum_t avro_float(float f); avro_datum_t avro_double(double d); avro_datum_t avro_boolean(int8_t i); avro_datum_t avro_null(void); avro_datum_t avro_record(avro_schema_t schema); avro_datum_t avro_enum(avro_schema_t schema, int i); avro_datum_t avro_fixed(avro_schema_t schema, const char *bytes, const int64_t size); avro_datum_t avro_givefixed(avro_schema_t schema, const char *bytes, const int64_t size, avro_free_func_t free); avro_datum_t avro_map(avro_schema_t schema); avro_datum_t avro_array(avro_schema_t schema); avro_datum_t avro_union(avro_schema_t schema, int64_t discriminant, const avro_datum_t datum); /** * Returns the schema that the datum is an instance of. */ avro_schema_t avro_datum_get_schema(const avro_datum_t datum); /* * Constructs a new avro_datum_t instance that's appropriate for holding * values of the given schema. */ avro_datum_t avro_datum_from_schema(const avro_schema_t schema); /* getters */ int avro_string_get(avro_datum_t datum, char **p); int avro_bytes_get(avro_datum_t datum, char **bytes, int64_t * size); int avro_int32_get(avro_datum_t datum, int32_t * i); int avro_int64_get(avro_datum_t datum, int64_t * l); int avro_float_get(avro_datum_t datum, float *f); int avro_double_get(avro_datum_t datum, double *d); int avro_boolean_get(avro_datum_t datum, int8_t * i); int avro_enum_get(const avro_datum_t datum); const char *avro_enum_get_name(const avro_datum_t datum); int avro_fixed_get(avro_datum_t datum, char **bytes, int64_t * size); int avro_record_get(const avro_datum_t record, const char *field_name, avro_datum_t * value); /* * A helper macro that extracts the value of the given field of a * record. */ #define avro_record_get_field_value(rc, rec, typ, fname, ...) \ do { \ avro_datum_t field = NULL; \ (rc) = avro_record_get((rec), (fname), &field); \ if (rc) break; \ (rc) = avro_##typ##_get(field, __VA_ARGS__); \ } while (0) int avro_map_get(const avro_datum_t datum, const char *key, avro_datum_t * value); /* * For maps, the "index" for each entry is based on the order that they * were added to the map. */ int avro_map_get_key(const avro_datum_t datum, int index, const char **key); size_t avro_map_size(const avro_datum_t datum); int avro_array_get(const avro_datum_t datum, int64_t index, avro_datum_t * value); size_t avro_array_size(const avro_datum_t datum); /* * These accessors allow you to query the current branch of a union * value, returning either the branch's discriminant value or the * avro_datum_t of the branch. A union value can be uninitialized, in * which case the discriminant will be -1 and the datum NULL. */ int64_t avro_union_discriminant(const avro_datum_t datum); avro_datum_t avro_union_current_branch(avro_datum_t datum); /* setters */ int avro_string_set(avro_datum_t datum, const char *p); int avro_givestring_set(avro_datum_t datum, const char *p, avro_free_func_t free); int avro_bytes_set(avro_datum_t datum, const char *bytes, const int64_t size); int avro_givebytes_set(avro_datum_t datum, const char *bytes, const int64_t size, avro_free_func_t free); int avro_int32_set(avro_datum_t datum, const int32_t i); int avro_int64_set(avro_datum_t datum, const int64_t l); int avro_float_set(avro_datum_t datum, const float f); int avro_double_set(avro_datum_t datum, const double d); int avro_boolean_set(avro_datum_t datum, const int8_t i); int avro_enum_set(avro_datum_t datum, const int symbol_value); int avro_enum_set_name(avro_datum_t datum, const char *symbol_name); int avro_fixed_set(avro_datum_t datum, const char *bytes, const int64_t size); int avro_givefixed_set(avro_datum_t datum, const char *bytes, const int64_t size, avro_free_func_t free); int avro_record_set(avro_datum_t record, const char *field_name, avro_datum_t value); /* * A helper macro that sets the value of the given field of a record. */ #define avro_record_set_field_value(rc, rec, typ, fname, ...) \ do { \ avro_datum_t field = NULL; \ (rc) = avro_record_get((rec), (fname), &field); \ if (rc) break; \ (rc) = avro_##typ##_set(field, __VA_ARGS__); \ } while (0) int avro_map_set(avro_datum_t map, const char *key, avro_datum_t value); int avro_array_append_datum(avro_datum_t array_datum, avro_datum_t datum); /* * This function selects the active branch of a union value, and can be * safely called on an existing union to change the current branch. If * the branch changes, we'll automatically construct a new avro_datum_t * for the new branch's schema type. If the desired branch is already * the active branch of the union, we'll leave the existing datum * instance as-is. The branch datum will be placed into the "branch" * parameter, regardless of whether we have to create a new datum * instance or not. */ int avro_union_set_discriminant(avro_datum_t unionp, int discriminant, avro_datum_t *branch); /* reference counting */ avro_datum_t avro_datum_incref(avro_datum_t value); void avro_datum_decref(avro_datum_t value); void avro_datum_print(avro_datum_t value, FILE * fp); int avro_datum_equal(avro_datum_t a, avro_datum_t b); /* * Returns a string containing the JSON encoding of an Avro value. You * must free this string when you're done with it, using the standard * free() function. (*Not* using the custom Avro allocator.) */ int avro_datum_to_json(const avro_datum_t datum, int one_line, char **json_str); int avro_schema_match(avro_schema_t writers_schema, avro_schema_t readers_schema); int avro_schema_datum_validate(avro_schema_t expected_schema, avro_datum_t datum); int avro_read_data(avro_reader_t reader, avro_schema_t writer_schema, avro_schema_t reader_schema, avro_datum_t * datum); int avro_skip_data(avro_reader_t reader, avro_schema_t writer_schema); int avro_write_data(avro_writer_t writer, avro_schema_t writer_schema, avro_datum_t datum); int64_t avro_size_data(avro_writer_t writer, avro_schema_t writer_schema, avro_datum_t datum); /* File object container */ typedef struct avro_file_reader_t_ *avro_file_reader_t; typedef struct avro_file_writer_t_ *avro_file_writer_t; int avro_file_writer_create(const char *path, avro_schema_t schema, avro_file_writer_t * writer); int avro_file_writer_open(const char *path, avro_file_writer_t * writer); int avro_file_reader(const char *path, avro_file_reader_t * reader); int avro_file_writer_append(avro_file_writer_t writer, avro_datum_t datum); int avro_file_writer_sync(avro_file_writer_t writer); int avro_file_writer_flush(avro_file_writer_t writer); int avro_file_writer_close(avro_file_writer_t writer); int avro_file_reader_read(avro_file_reader_t reader, avro_schema_t readers_schema, avro_datum_t * datum); int avro_file_reader_close(avro_file_reader_t reader); CLOSE_EXTERN #endif
7.2. test_avro_data.c
Another good way to learn how to encode/decode data in Avro C is to look at the test_avro_data.c unit test. This simple unit test checks that all the avro types can be encoded/decoded correctly.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ #include "avro_private.h" #include <inttypes.h> #include <limits.h> #include <stdlib.h> #include <string.h> #include <time.h> char buf[4096]; avro_reader_t reader; avro_writer_t writer; typedef int (*avro_test) (void); /* * Use a custom allocator that verifies that the size that we use to * free an object matches the size that we use to allocate it. */ static void * test_allocator(void *ud, void *ptr, size_t osize, size_t nsize) { AVRO_UNUSED(ud); AVRO_UNUSED(osize); if (nsize == 0) { size_t *size = ((size_t *) ptr) - 1; if (osize != *size) { fprintf(stderr, "Error freeing %p:\n" "Size passed to avro_free (%zu) " "doesn't match size passed to " "avro_malloc (%zu)\n", ptr, osize, *size); exit(EXIT_FAILURE); } free(size); return NULL; } else { size_t real_size = nsize + sizeof(size_t); size_t *old_size = ptr? ((size_t *) ptr)-1: NULL; size_t *size = realloc(old_size, real_size); *size = nsize; return (size + 1); } } void init_rand(void) { srand(time(NULL)); } double rand_number(double from, double to) { double range = to - from; return from + ((double)rand() / (RAND_MAX + 1.0)) * range; } int64_t rand_int64(void) { return (int64_t) rand_number(LONG_MIN, LONG_MAX); } int32_t rand_int32(void) { return (int32_t) rand_number(INT_MIN, INT_MAX); } void write_read_check(avro_schema_t writers_schema, avro_datum_t datum, avro_schema_t readers_schema, avro_datum_t expected, char *type) { avro_datum_t datum_out; int validate; for (validate = 0; validate <= 1; validate++) { reader = avro_reader_memory(buf, sizeof(buf)); writer = avro_writer_memory(buf, sizeof(buf)); if (!expected) { expected = datum; } /* Validating read/write */ if (avro_write_data (writer, validate ? writers_schema : NULL, datum)) { fprintf(stderr, "Unable to write %s validate=%d\n", type, validate); exit(EXIT_FAILURE); } int64_t size = avro_size_data(writer, validate ? writers_schema : NULL, datum); if (size != avro_writer_tell(writer)) { fprintf(stderr, "Unable to calculate size %s validate=%d (%"PRId64" != %"PRId64")\n", type, validate, size, avro_writer_tell(writer)); exit(EXIT_FAILURE); } if (avro_read_data (reader, writers_schema, readers_schema, &datum_out)) { fprintf(stderr, "Unable to read %s validate=%d\n", type, validate); fprintf(stderr, " %s\n", avro_strerror()); exit(EXIT_FAILURE); } if (!avro_datum_equal(expected, datum_out)) { fprintf(stderr, "Unable to encode/decode %s validate=%d\n", type, validate); exit(EXIT_FAILURE); } avro_reader_dump(reader, stderr); avro_datum_decref(datum_out); avro_reader_free(reader); avro_writer_free(writer); } } static void test_json(avro_datum_t datum, const char *expected) { char *json = NULL; avro_datum_to_json(datum, 1, &json); if (strcmp(json, expected) != 0) { fprintf(stderr, "Unexpected JSON encoding: %s\n", json); exit(EXIT_FAILURE); } free(json); } static int test_string(void) { unsigned int i; const char *strings[] = { "Four score and seven years ago", "our father brought forth on this continent", "a new nation", "conceived in Liberty", "and dedicated to the proposition that all men are created equal." }; avro_schema_t writer_schema = avro_schema_string(); for (i = 0; i < sizeof(strings) / sizeof(strings[0]); i++) { avro_datum_t datum = avro_givestring(strings[i], NULL); write_read_check(writer_schema, datum, NULL, NULL, "string"); avro_datum_decref(datum); } avro_datum_t datum = avro_givestring(strings[0], NULL); test_json(datum, "\"Four score and seven years ago\""); avro_datum_decref(datum); // The following should bork if we don't copy the string value // correctly (since we'll try to free a static string). datum = avro_string("this should be copied"); avro_string_set(datum, "also this"); avro_datum_decref(datum); avro_schema_decref(writer_schema); return 0; } static int test_bytes(void) { char bytes[] = { 0xDE, 0xAD, 0xBE, 0xEF }; avro_schema_t writer_schema = avro_schema_bytes(); avro_datum_t datum; avro_datum_t expected_datum; datum = avro_givebytes(bytes, sizeof(bytes), NULL); write_read_check(writer_schema, datum, NULL, NULL, "bytes"); test_json(datum, "\"\\u00de\\u00ad\\u00be\\u00ef\""); avro_datum_decref(datum); avro_schema_decref(writer_schema); datum = avro_givebytes(NULL, 0, NULL); avro_givebytes_set(datum, bytes, sizeof(bytes), NULL); expected_datum = avro_givebytes(bytes, sizeof(bytes), NULL); if (!avro_datum_equal(datum, expected_datum)) { fprintf(stderr, "Expected equal bytes instances.\n"); exit(EXIT_FAILURE); } avro_datum_decref(datum); avro_datum_decref(expected_datum); // The following should bork if we don't copy the bytes value // correctly (since we'll try to free a static string). datum = avro_bytes("original", 8); avro_bytes_set(datum, "alsothis", 8); avro_datum_decref(datum); avro_schema_decref(writer_schema); return 0; } static int test_int32(void) { int i; avro_schema_t writer_schema = avro_schema_int(); avro_schema_t long_schema = avro_schema_long(); avro_schema_t float_schema = avro_schema_float(); avro_schema_t double_schema = avro_schema_double(); for (i = 0; i < 100; i++) { int32_t value = rand_int32(); avro_datum_t datum = avro_int32(value); avro_datum_t long_datum = avro_int64(value); avro_datum_t float_datum = avro_float(value); avro_datum_t double_datum = avro_double(value); write_read_check(writer_schema, datum, NULL, NULL, "int"); write_read_check(writer_schema, datum, long_schema, long_datum, "int->long"); write_read_check(writer_schema, datum, float_schema, float_datum, "int->float"); write_read_check(writer_schema, datum, double_schema, double_datum, "int->double"); avro_datum_decref(datum); avro_datum_decref(long_datum); avro_datum_decref(float_datum); avro_datum_decref(double_datum); } avro_datum_t datum = avro_int32(10000); test_json(datum, "10000"); avro_datum_decref(datum); avro_schema_decref(writer_schema); avro_schema_decref(long_schema); avro_schema_decref(float_schema); avro_schema_decref(double_schema); return 0; } static int test_int64(void) { int i; avro_schema_t writer_schema = avro_schema_long(); avro_schema_t float_schema = avro_schema_float(); avro_schema_t double_schema = avro_schema_double(); for (i = 0; i < 100; i++) { int64_t value = rand_int64(); avro_datum_t datum = avro_int64(value); avro_datum_t float_datum = avro_float(value); avro_datum_t double_datum = avro_double(value); write_read_check(writer_schema, datum, NULL, NULL, "long"); write_read_check(writer_schema, datum, float_schema, float_datum, "long->float"); write_read_check(writer_schema, datum, double_schema, double_datum, "long->double"); avro_datum_decref(datum); avro_datum_decref(float_datum); avro_datum_decref(double_datum); } avro_datum_t datum = avro_int64(10000); test_json(datum, "10000"); avro_datum_decref(datum); avro_schema_decref(writer_schema); avro_schema_decref(float_schema); avro_schema_decref(double_schema); return 0; } static int test_double(void) { int i; avro_schema_t schema = avro_schema_double(); for (i = 0; i < 100; i++) { avro_datum_t datum = avro_double(rand_number(-1.0E10, 1.0E10)); write_read_check(schema, datum, NULL, NULL, "double"); avro_datum_decref(datum); } avro_datum_t datum = avro_double(2000.0); test_json(datum, "2000.0"); avro_datum_decref(datum); avro_schema_decref(schema); return 0; } static int test_float(void) { int i; avro_schema_t schema = avro_schema_float(); avro_schema_t double_schema = avro_schema_double(); for (i = 0; i < 100; i++) { float value = rand_number(-1.0E10, 1.0E10); avro_datum_t datum = avro_float(value); avro_datum_t double_datum = avro_double(value); write_read_check(schema, datum, NULL, NULL, "float"); write_read_check(schema, datum, double_schema, double_datum, "float->double"); avro_datum_decref(datum); avro_datum_decref(double_datum); } avro_datum_t datum = avro_float(2000.0); test_json(datum, "2000.0"); avro_datum_decref(datum); avro_schema_decref(schema); avro_schema_decref(double_schema); return 0; } static int test_boolean(void) { int i; const char *expected_json[] = { "false", "true" }; avro_schema_t schema = avro_schema_boolean(); for (i = 0; i <= 1; i++) { avro_datum_t datum = avro_boolean(i); write_read_check(schema, datum, NULL, NULL, "boolean"); test_json(datum, expected_json[i]); avro_datum_decref(datum); } avro_schema_decref(schema); return 0; } static int test_null(void) { avro_schema_t schema = avro_schema_null(); avro_datum_t datum = avro_null(); write_read_check(schema, datum, NULL, NULL, "null"); test_json(datum, "null"); avro_datum_decref(datum); return 0; } static int test_record(void) { avro_schema_t schema = avro_schema_record("person", NULL); avro_schema_record_field_append(schema, "name", avro_schema_string()); avro_schema_record_field_append(schema, "age", avro_schema_int()); avro_datum_t datum = avro_record(schema); avro_datum_t name_datum, age_datum; name_datum = avro_givestring("Joseph Campbell", NULL); age_datum = avro_int32(83); avro_record_set(datum, "name", name_datum); avro_record_set(datum, "age", age_datum); write_read_check(schema, datum, NULL, NULL, "record"); test_json(datum, "{\"name\": \"Joseph Campbell\", \"age\": 83}"); int rc; avro_record_set_field_value(rc, datum, int32, "age", 104); int32_t age = 0; avro_record_get_field_value(rc, datum, int32, "age", &age); if (age != 104) { fprintf(stderr, "Incorrect age value\n"); exit(EXIT_FAILURE); } avro_datum_decref(name_datum); avro_datum_decref(age_datum); avro_datum_decref(datum); avro_schema_decref(schema); return 0; } static int test_nested_record(void) { const char *json = "{" " \"type\": \"record\"," " \"name\": \"list\"," " \"fields\": [" " { \"name\": \"x\", \"type\": \"int\" }," " { \"name\": \"y\", \"type\": \"int\" }," " { \"name\": \"next\", \"type\": [\"null\",\"list\"]}" " ]" "}"; int rval; avro_schema_t schema = NULL; avro_schema_error_t error; avro_schema_from_json(json, strlen(json), &schema, &error); avro_datum_t head = avro_datum_from_schema(schema); avro_record_set_field_value(rval, head, int32, "x", 10); avro_record_set_field_value(rval, head, int32, "y", 10); avro_datum_t next = NULL; avro_datum_t tail = NULL; avro_record_get(head, "next", &next); avro_union_set_discriminant(next, 1, &tail); avro_record_set_field_value(rval, tail, int32, "x", 20); avro_record_set_field_value(rval, tail, int32, "y", 20); avro_record_get(tail, "next", &next); avro_union_set_discriminant(next, 0, NULL); write_read_check(schema, head, NULL, NULL, "nested record"); avro_schema_decref(schema); avro_datum_decref(head); return 0; } static int test_enum(void) { enum avro_languages { AVRO_C, AVRO_CPP, AVRO_PYTHON, AVRO_RUBY, AVRO_JAVA }; avro_schema_t schema = avro_schema_enum("language"); avro_datum_t datum = avro_enum(schema, AVRO_C); avro_schema_enum_symbol_append(schema, "C"); avro_schema_enum_symbol_append(schema, "C++"); avro_schema_enum_symbol_append(schema, "Python"); avro_schema_enum_symbol_append(schema, "Ruby"); avro_schema_enum_symbol_append(schema, "Java"); if (avro_enum_get(datum) != AVRO_C) { fprintf(stderr, "Unexpected enum value AVRO_C\n"); exit(EXIT_FAILURE); } if (strcmp(avro_enum_get_name(datum), "C") != 0) { fprintf(stderr, "Unexpected enum value name C\n"); exit(EXIT_FAILURE); } write_read_check(schema, datum, NULL, NULL, "enum"); test_json(datum, "\"C\""); avro_enum_set(datum, AVRO_CPP); if (strcmp(avro_enum_get_name(datum), "C++") != 0) { fprintf(stderr, "Unexpected enum value name C++\n"); exit(EXIT_FAILURE); } write_read_check(schema, datum, NULL, NULL, "enum"); test_json(datum, "\"C++\""); avro_enum_set_name(datum, "Python"); if (avro_enum_get(datum) != AVRO_PYTHON) { fprintf(stderr, "Unexpected enum value AVRO_PYTHON\n"); exit(EXIT_FAILURE); } write_read_check(schema, datum, NULL, NULL, "enum"); test_json(datum, "\"Python\""); avro_datum_decref(datum); avro_schema_decref(schema); return 0; } static int test_array(void) { int i, rval; avro_schema_t schema = avro_schema_array(avro_schema_int()); avro_datum_t datum = avro_array(schema); for (i = 0; i < 10; i++) { avro_datum_t i32_datum = avro_int32(i); rval = avro_array_append_datum(datum, i32_datum); avro_datum_decref(i32_datum); if (rval) { exit(EXIT_FAILURE); } } if (avro_array_size(datum) != 10) { fprintf(stderr, "Unexpected array size"); exit(EXIT_FAILURE); } write_read_check(schema, datum, NULL, NULL, "array"); test_json(datum, "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"); avro_datum_decref(datum); avro_schema_decref(schema); return 0; } static int test_map(void) { avro_schema_t schema = avro_schema_map(avro_schema_long()); avro_datum_t datum = avro_map(schema); int64_t i = 0; char *nums[] = { "zero", "one", "two", "three", "four", "five", "six", NULL }; while (nums[i]) { avro_datum_t i_datum = avro_int64(i); avro_map_set(datum, nums[i], i_datum); avro_datum_decref(i_datum); i++; } if (avro_array_size(datum) != 7) { fprintf(stderr, "Unexpected map size\n"); exit(EXIT_FAILURE); } avro_datum_t value; const char *key; avro_map_get_key(datum, 2, &key); avro_map_get(datum, key, &value); int64_t val; avro_int64_get(value, &val); if (val != 2) { fprintf(stderr, "Unexpected map value 2\n"); exit(EXIT_FAILURE); } write_read_check(schema, datum, NULL, NULL, "map"); test_json(datum, "{\"zero\": 0, \"one\": 1, \"two\": 2, \"three\": 3, " "\"four\": 4, \"five\": 5, \"six\": 6}"); avro_datum_decref(datum); avro_schema_decref(schema); return 0; } static int test_union(void) { avro_schema_t schema = avro_schema_union(); avro_datum_t union_datum; avro_datum_t datum; avro_datum_t union_datum1; avro_datum_t datum1; avro_schema_union_append(schema, avro_schema_string()); avro_schema_union_append(schema, avro_schema_int()); avro_schema_union_append(schema, avro_schema_null()); datum = avro_givestring("Follow your bliss.", NULL); union_datum = avro_union(schema, 0, datum); if (avro_union_discriminant(union_datum) != 0) { fprintf(stderr, "Unexpected union discriminant\n"); exit(EXIT_FAILURE); } if (avro_union_current_branch(union_datum) != datum) { fprintf(stderr, "Unexpected union branch datum\n"); exit(EXIT_FAILURE); } union_datum1 = avro_datum_from_schema(schema); avro_union_set_discriminant(union_datum1, 0, &datum1); avro_givestring_set(datum1, "Follow your bliss.", NULL); if (!avro_datum_equal(datum, datum1)) { fprintf(stderr, "Union values should be equal\n"); exit(EXIT_FAILURE); } write_read_check(schema, union_datum, NULL, NULL, "union"); test_json(union_datum, "{\"string\": \"Follow your bliss.\"}"); avro_datum_decref(datum); avro_union_set_discriminant(union_datum, 2, &datum); test_json(union_datum, "null"); avro_datum_decref(union_datum); avro_datum_decref(datum); avro_datum_decref(union_datum1); avro_schema_decref(schema); return 0; } static int test_fixed(void) { char bytes[] = { 0xD, 0xA, 0xD, 0xA, 0xB, 0xA, 0xB, 0xA }; avro_schema_t schema = avro_schema_fixed("msg", sizeof(bytes)); avro_datum_t datum; avro_datum_t expected_datum; datum = avro_givefixed(schema, bytes, sizeof(bytes), NULL); write_read_check(schema, datum, NULL, NULL, "fixed"); test_json(datum, "\"\\r\\n\\r\\n\\u000b\\n\\u000b\\n\""); avro_datum_decref(datum); datum = avro_givefixed(schema, NULL, sizeof(bytes), NULL); avro_givefixed_set(datum, bytes, sizeof(bytes), NULL); expected_datum = avro_givefixed(schema, bytes, sizeof(bytes), NULL); if (!avro_datum_equal(datum, expected_datum)) { fprintf(stderr, "Expected equal fixed instances.\n"); exit(EXIT_FAILURE); } avro_datum_decref(datum); avro_datum_decref(expected_datum); // The following should bork if we don't copy the fixed value // correctly (since we'll try to free a static string). datum = avro_fixed(schema, "original", 8); avro_fixed_set(datum, "alsothis", 8); avro_datum_decref(datum); avro_schema_decref(schema); return 0; } int main(void) { avro_set_allocator(test_allocator, NULL); unsigned int i; struct avro_tests { char *name; avro_test func; } tests[] = { { "string", test_string}, { "bytes", test_bytes}, { "int", test_int32}, { "long", test_int64}, { "float", test_float}, { "double", test_double}, { "boolean", test_boolean}, { "null", test_null}, { "record", test_record}, { "nested_record", test_nested_record}, { "enum", test_enum}, { "array", test_array}, { "map", test_map}, { "fixed", test_fixed}, { "union", test_union} }; init_rand(); for (i = 0; i < sizeof(tests) / sizeof(tests[0]); i++) { struct avro_tests *test = tests + i; fprintf(stderr, "**** Running %s tests ****\n", test->name); if (test->func() != 0) { return EXIT_FAILURE; } } return EXIT_SUCCESS; }