00001
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021
00022 #include "Encoder.hh"
00023 #include "buffer/Buffer.hh"
00024 #include "ValidSchema.hh"
00025 #include "Specific.hh"
00026 #include "Stream.hh"
00027
00028 #include <map>
00029 #include <string>
00030 #include <vector>
00031
00032 #include "boost/array.hpp"
00033 #include "boost/utility.hpp"
00034
00035 namespace avro {
00036
00037 typedef boost::array<uint8_t, 16> DataFileSync;
00038
00044 class DataFileWriterBase : boost::noncopyable {
00045 const std::string filename_;
00046 const ValidSchema schema_;
00047 const EncoderPtr encoderPtr_;
00048 const size_t syncInterval_;
00049
00050 std::auto_ptr<OutputStream> stream_;
00051 std::auto_ptr<OutputStream> buffer_;
00052 const DataFileSync sync_;
00053 int64_t objectCount_;
00054
00055 typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00056
00057 Metadata metadata_;
00058
00059 static std::auto_ptr<OutputStream> makeStream(const char* filename);
00060 static DataFileSync makeSync();
00061
00062 void writeHeader();
00063 void setMetadata(const std::string& key, const std::string& value);
00064
00068 void sync();
00069
00070 public:
00071 Encoder& encoder() const { return *encoderPtr_; }
00072
00073 void syncIfNeeded();
00074
00075 void incr() {
00076 ++objectCount_;
00077 }
00081 DataFileWriterBase(const char* filename, const ValidSchema& schema,
00082 size_t syncInterval);
00083
00084 ~DataFileWriterBase();
00089 void close();
00090
00094 const ValidSchema& schema() const { return schema_; }
00095
00099 void flush();
00100 };
00101
00105 template <typename T>
00106 class DataFileWriter : boost::noncopyable {
00107 std::auto_ptr<DataFileWriterBase> base_;
00108 public:
00112 DataFileWriter(const char* filename, const ValidSchema& schema,
00113 size_t syncInterval = 16 * 1024) :
00114 base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00115
00119 void write(const T& datum) {
00120 base_->syncIfNeeded();
00121 avro::encode(base_->encoder(), datum);
00122 base_->incr();
00123 }
00124
00129 void close() { base_->close(); }
00130
00134 const ValidSchema& schema() const { return base_->schema(); }
00135
00139 void flush() { base_->flush(); }
00140 };
00141
00142 class DataFileReaderBase : boost::noncopyable {
00143 const std::string filename_;
00144 const std::auto_ptr<InputStream> stream_;
00145 const DecoderPtr decoder_;
00146 int64_t objectCount_;
00147
00148 ValidSchema readerSchema_;
00149 ValidSchema dataSchema_;
00150 DecoderPtr dataDecoder_;
00151 std::auto_ptr<InputStream> dataStream_;
00152 typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00153
00154 Metadata metadata_;
00155 DataFileSync sync_;
00156
00157 void readHeader();
00158
00159 bool readDataBlock();
00160 public:
00161 Decoder& decoder() { return *dataDecoder_; }
00162
00166 bool hasMore();
00167
00168 void decr() { --objectCount_; }
00169
00176 DataFileReaderBase(const char* filename);
00177
00182 void init();
00183
00191 void init(const ValidSchema& readerSchema);
00192
00196 const ValidSchema& readerSchema() { return readerSchema_; }
00197
00201 const ValidSchema& dataSchema() { return dataSchema_; }
00202
00206 void close();
00207 };
00208
00209 template <typename T>
00210 class DataFileReader : boost::noncopyable {
00211 std::auto_ptr<DataFileReaderBase> base_;
00212 public:
00217 DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00218 base_(new DataFileReaderBase(filename)) {
00219 base_->init(readerSchema);
00220 }
00221
00226 DataFileReader(const char* filename) :
00227 base_(new DataFileReaderBase(filename)) {
00228 base_->init();
00229 }
00230
00231
00241 DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00242 base_->init();
00243 }
00244
00254 DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00255 const ValidSchema& readerSchema) : base_(base) {
00256 base_->init(readerSchema);
00257 }
00258
00259 bool read(T& datum) {
00260 if (base_->hasMore()) {
00261 base_->decr();
00262 avro::decode(base_->decoder(), datum);
00263 return true;
00264 }
00265 return false;
00266 }
00267
00271 const ValidSchema& readerSchema() { return base_->readerSchema(); }
00272
00276 const ValidSchema& dataSchema() { return base_->dataSchema(); }
00277
00281 void close() { return base_->close(); }
00282 };
00283
00284 }
00285 #endif
00286
00287