Avro C++
|
00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one 00003 * or more contributor license agreements. See the NOTICE file 00004 * distributed with this work for additional information 00005 * regarding copyright ownership. The ASF licenses this file 00006 * to you under the Apache License, Version 2.0 (the 00007 * "License"); you may not use this file except in compliance 00008 * with the License. You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 00019 #ifndef avro_DataFile_hh__ 00020 #define avro_DataFile_hh__ 00021 00022 #include "Config.hh" 00023 #include "Encoder.hh" 00024 #include "buffer/Buffer.hh" 00025 #include "ValidSchema.hh" 00026 #include "Specific.hh" 00027 #include "Stream.hh" 00028 00029 #include <map> 00030 #include <string> 00031 #include <vector> 00032 00033 #include "boost/array.hpp" 00034 #include "boost/utility.hpp" 00035 #include <boost/iostreams/filtering_stream.hpp> 00036 #include <boost/scoped_ptr.hpp> 00037 00038 namespace avro { 00039 00041 enum Codec { 00042 NULL_CODEC, 00043 DEFLATE_CODEC 00044 }; 00045 00049 typedef boost::array<uint8_t, 16> DataFileSync; 00050 00056 class AVRO_DECL DataFileWriterBase : boost::noncopyable { 00057 const std::string filename_; 00058 const ValidSchema schema_; 00059 const EncoderPtr encoderPtr_; 00060 const size_t syncInterval_; 00061 Codec codec_; 00062 00063 std::auto_ptr<OutputStream> stream_; 00064 std::auto_ptr<OutputStream> buffer_; 00065 const DataFileSync sync_; 00066 int64_t objectCount_; 00067 00068 typedef std::map<std::string, std::vector<uint8_t> > Metadata; 00069 00070 Metadata metadata_; 00071 00072 static std::auto_ptr<OutputStream> makeStream(const char* filename); 00073 static DataFileSync makeSync(); 00074 00075 void writeHeader(); 00076 void setMetadata(const std::string& key, const std::string& value); 00077 00081 void sync(); 00082 00083 public: 00087 Encoder& encoder() const { return *encoderPtr_; } 00088 00093 void syncIfNeeded(); 00094 00098 void incr() { 00099 ++objectCount_; 00100 } 00104 DataFileWriterBase(const char* filename, const ValidSchema& schema, 00105 size_t syncInterval, Codec codec = NULL_CODEC); 00106 00107 ~DataFileWriterBase(); 00112 void close(); 00113 00117 const ValidSchema& schema() const { return schema_; } 00118 00122 void flush(); 00123 }; 00124 00128 template <typename T> 00129 class DataFileWriter : boost::noncopyable { 00130 std::auto_ptr<DataFileWriterBase> base_; 00131 public: 00135 DataFileWriter(const char* filename, const ValidSchema& schema, 00136 size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : 00137 base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { } 00138 00142 void write(const T& datum) { 00143 base_->syncIfNeeded(); 00144 avro::encode(base_->encoder(), datum); 00145 base_->incr(); 00146 } 00147 00152 void close() { base_->close(); } 00153 00157 const ValidSchema& schema() const { return base_->schema(); } 00158 00162 void flush() { base_->flush(); } 00163 }; 00164 00168 class AVRO_DECL DataFileReaderBase : boost::noncopyable { 00169 const std::string filename_; 00170 const std::auto_ptr<InputStream> stream_; 00171 const DecoderPtr decoder_; 00172 int64_t objectCount_; 00173 bool eof_; 00174 Codec codec_; 00175 00176 ValidSchema readerSchema_; 00177 ValidSchema dataSchema_; 00178 DecoderPtr dataDecoder_; 00179 std::auto_ptr<InputStream> dataStream_; 00180 typedef std::map<std::string, std::vector<uint8_t> > Metadata; 00181 00182 Metadata metadata_; 00183 DataFileSync sync_; 00184 00185 // for compressed buffer 00186 boost::scoped_ptr<boost::iostreams::filtering_istream> os_; 00187 std::vector<char> compressed_; 00188 00189 void readHeader(); 00190 00191 bool readDataBlock(); 00192 public: 00196 Decoder& decoder() { return *dataDecoder_; } 00197 00201 bool hasMore(); 00202 00206 void decr() { --objectCount_; } 00207 00214 DataFileReaderBase(const char* filename); 00215 00220 void init(); 00221 00229 void init(const ValidSchema& readerSchema); 00230 00234 const ValidSchema& readerSchema() { return readerSchema_; } 00235 00239 const ValidSchema& dataSchema() { return dataSchema_; } 00240 00244 void close(); 00245 }; 00246 00250 template <typename T> 00251 class DataFileReader : boost::noncopyable { 00252 std::auto_ptr<DataFileReaderBase> base_; 00253 public: 00258 DataFileReader(const char* filename, const ValidSchema& readerSchema) : 00259 base_(new DataFileReaderBase(filename)) { 00260 base_->init(readerSchema); 00261 } 00262 00267 DataFileReader(const char* filename) : 00268 base_(new DataFileReaderBase(filename)) { 00269 base_->init(); 00270 } 00271 00272 00282 DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) { 00283 base_->init(); 00284 } 00285 00295 DataFileReader(std::auto_ptr<DataFileReaderBase> base, 00296 const ValidSchema& readerSchema) : base_(base) { 00297 base_->init(readerSchema); 00298 } 00299 00305 bool read(T& datum) { 00306 if (base_->hasMore()) { 00307 base_->decr(); 00308 avro::decode(base_->decoder(), datum); 00309 return true; 00310 } 00311 return false; 00312 } 00313 00317 const ValidSchema& readerSchema() { return base_->readerSchema(); } 00318 00322 const ValidSchema& dataSchema() { return base_->dataSchema(); } 00323 00327 void close() { return base_->close(); } 00328 }; 00329 00330 } // namespace avro 00331 #endif