Avro C++
|
00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one 00003 * or more contributor license agreements. See the NOTICE file 00004 * distributed with this work for additional information 00005 * regarding copyright ownership. The ASF licenses this file 00006 * to you under the Apache License, Version 2.0 (the 00007 * "License"); you may not use this file except in compliance 00008 * with the License. You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 00019 #ifndef avro_Stream_hh__ 00020 #define avro_Stream_hh__ 00021 00022 #include <memory> 00023 #include <string.h> 00024 #include <stdint.h> 00025 00026 #include "boost/utility.hpp" 00027 00028 #include "Config.hh" 00029 #include "Exception.hh" 00030 00031 namespace avro { 00032 00036 class AVRO_DECL InputStream : boost::noncopyable { 00037 protected: 00038 00042 InputStream() { } 00043 00044 public: 00048 virtual ~InputStream() { } 00049 00056 virtual bool next(const uint8_t** data, size_t* len) = 0; 00057 00063 virtual void backup(size_t len) = 0; 00064 00068 virtual void skip(size_t len) = 0; 00069 00075 virtual size_t byteCount() const = 0; 00076 }; 00077 00081 class AVRO_DECL OutputStream : boost::noncopyable { 00082 protected: 00083 00087 OutputStream() { } 00088 public: 00089 00093 virtual ~OutputStream() { } 00094 00100 virtual bool next(uint8_t** data, size_t* len) = 0; 00101 00106 virtual void backup(size_t len) = 0; 00107 00113 virtual uint64_t byteCount() const = 0; 00114 00119 virtual void flush() = 0; 00120 }; 00121 00125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024); 00126 00132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len); 00133 00141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source); 00142 00150 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename, 00151 size_t bufferSize = 8 * 1024); 00152 00157 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename, 00158 size_t bufferSize = 8 * 1024); 00159 00165 AVRO_DECL std::auto_ptr<OutputStream> ostreamOutputStream(std::ostream& os, 00166 size_t bufferSize = 8 * 1024); 00167 00173 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in, 00174 size_t bufferSize = 8 * 1024); 00175 00177 struct StreamReader { 00181 InputStream* in_; 00182 00186 const uint8_t* next_; 00187 00191 const uint8_t* end_; 00192 00196 StreamReader() : in_(0), next_(0), end_(0) { } 00197 00201 StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); } 00202 00207 void reset(InputStream& is) { 00208 if (in_ != 0 && end_ != next_) { 00209 in_->backup(end_ - next_); 00210 } 00211 in_ = &is; 00212 next_ = end_ = 0; 00213 } 00214 00219 uint8_t read() { 00220 if (next_ == end_) { 00221 more(); 00222 } 00223 return *next_++; 00224 } 00225 00230 void readBytes(uint8_t* b, size_t n) { 00231 while (n > 0) { 00232 if (next_ == end_) { 00233 more(); 00234 } 00235 size_t q = end_ - next_; 00236 if (q > n) { 00237 q = n; 00238 } 00239 ::memcpy(b, next_, q); 00240 next_ += q; 00241 b += q; 00242 n -= q; 00243 } 00244 } 00245 00250 void skipBytes(size_t n) { 00251 if (n > static_cast<size_t>(end_ - next_)) { 00252 n -= end_ - next_; 00253 next_ = end_; 00254 in_->skip(n); 00255 } else { 00256 next_ += n; 00257 } 00258 } 00259 00266 bool fill() { 00267 size_t n = 0; 00268 while (in_->next(&next_, &n)) { 00269 if (n != 0) { 00270 end_ = next_ + n; 00271 return true; 00272 } 00273 } 00274 return false; 00275 } 00276 00280 void more() { 00281 if (! fill()) { 00282 throw Exception("EOF reached"); 00283 } 00284 } 00285 00289 bool hasMore() { 00290 return (next_ == end_) ? fill() : true; 00291 } 00292 }; 00293 00297 struct StreamWriter { 00301 OutputStream* out_; 00302 00306 uint8_t* next_; 00307 00311 uint8_t* end_; 00312 00316 StreamWriter() : out_(0), next_(0), end_(0) { } 00317 00321 StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); } 00322 00327 void reset(OutputStream& os) { 00328 if (out_ != 0 && end_ != next_) { 00329 out_->backup(end_ - next_); 00330 } 00331 out_ = &os; 00332 next_ = end_; 00333 } 00334 00338 void write(uint8_t c) { 00339 if (next_ == end_) { 00340 more(); 00341 } 00342 *next_++ = c; 00343 } 00344 00348 void writeBytes(const uint8_t* b, size_t n) { 00349 while (n > 0) { 00350 if (next_ == end_) { 00351 more(); 00352 } 00353 size_t q = end_ - next_; 00354 if (q > n) { 00355 q = n; 00356 } 00357 ::memcpy(next_, b, q); 00358 next_ += q; 00359 b += q; 00360 n -= q; 00361 } 00362 } 00363 00368 void flush() { 00369 if (next_ != end_) { 00370 out_->backup(end_ - next_); 00371 next_ = end_; 00372 } 00373 out_->flush(); 00374 } 00375 00379 void more() { 00380 size_t n = 0; 00381 while (out_->next(&next_, &n)) { 00382 if (n != 0) { 00383 end_ = next_ + n; 00384 return; 00385 } 00386 } 00387 throw Exception("EOF reached"); 00388 } 00389 00390 }; 00391 00396 inline void copy(InputStream& in, OutputStream& out) 00397 { 00398 const uint8_t *p = 0; 00399 size_t n = 0; 00400 StreamWriter w(out); 00401 while (in.next(&p, &n)) { 00402 w.writeBytes(p, n); 00403 } 00404 w.flush(); 00405 } 00406 00407 } // namespace avro 00408 #endif 00409 00410