Avro C++
Stream.hh
00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021 
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 
00026 #include "boost/utility.hpp"
00027 
00028 #include "Config.hh"
00029 #include "Exception.hh"
00030 
00031 namespace avro {
00032 
00036 class AVRO_DECL InputStream : boost::noncopyable {
00037 protected:
00038 
00042     InputStream() { }
00043 
00044 public:
00048     virtual ~InputStream() { }
00049 
00056     virtual bool next(const uint8_t** data, size_t* len) = 0;
00057 
00063     virtual void backup(size_t len) = 0;
00064 
00068     virtual void skip(size_t len) = 0;
00069 
00075     virtual size_t byteCount() const = 0;
00076 };
00077 
00081 class AVRO_DECL OutputStream : boost::noncopyable {
00082 protected:
00083 
00087     OutputStream() { }
00088 public:
00089 
00093     virtual ~OutputStream() { }
00094 
00100     virtual bool next(uint8_t** data, size_t* len) = 0;
00101 
00106     virtual void backup(size_t len) = 0;
00107 
00113     virtual uint64_t byteCount() const = 0;
00114 
00119     virtual void flush() = 0;
00120 };
00121 
00125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00126 
00132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00133 
00141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00142 
00150 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00151     size_t bufferSize = 8 * 1024);
00152 
00157 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename,
00158     size_t bufferSize = 8 * 1024);
00159 
00165 AVRO_DECL std::auto_ptr<OutputStream> ostreamOutputStream(std::ostream& os,
00166     size_t bufferSize = 8 * 1024);
00167 
00173 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in,
00174     size_t bufferSize = 8 * 1024);
00175 
00177 struct StreamReader {
00181     InputStream* in_;
00182 
00186     const uint8_t* next_;
00187 
00191     const uint8_t* end_;
00192 
00196     StreamReader() : in_(0), next_(0), end_(0) { }
00197 
00201     StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00202 
00207     void reset(InputStream& is) {
00208         if (in_ != 0 && end_ != next_) {
00209             in_->backup(end_ - next_);
00210         }
00211         in_ = &is;
00212         next_ = end_ = 0;
00213     }
00214 
00219     uint8_t read() {
00220         if (next_ == end_) {
00221             more();
00222         }
00223         return *next_++;
00224     }
00225 
00230     void readBytes(uint8_t* b, size_t n) {
00231         while (n > 0) {
00232             if (next_ == end_) {
00233                 more();
00234             }
00235             size_t q = end_ - next_;
00236             if (q > n) {
00237                 q = n;
00238             }
00239             ::memcpy(b, next_, q);
00240             next_ += q;
00241             b += q;
00242             n -= q;
00243         }
00244     }
00245 
00250     void skipBytes(size_t n) {
00251         if (n > static_cast<size_t>(end_ - next_)) {
00252             n -= end_ - next_;
00253             next_ = end_;
00254             in_->skip(n);
00255         } else {
00256             next_ += n;
00257         }
00258     }
00259 
00266     bool fill() {
00267         size_t n = 0;
00268         while (in_->next(&next_, &n)) {
00269             if (n != 0) {
00270                 end_ = next_ + n;
00271                 return true;
00272             }
00273         }
00274         return false;
00275     }
00276 
00280     void more() {
00281         if (! fill()) {
00282             throw Exception("EOF reached");
00283         }
00284     }
00285 
00289     bool hasMore() {
00290         return (next_ == end_) ? fill() : true;
00291     }
00292 };
00293 
00297 struct StreamWriter {
00301     OutputStream* out_;
00302 
00306     uint8_t* next_;
00307     
00311     uint8_t* end_;
00312 
00316     StreamWriter() : out_(0), next_(0), end_(0) { }
00317 
00321     StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00322 
00327     void reset(OutputStream& os) {
00328         if (out_ != 0 && end_ != next_) {
00329             out_->backup(end_ - next_);
00330         }
00331         out_ = &os;
00332         next_ = end_;
00333     }
00334 
00338     void write(uint8_t c) {
00339         if (next_ == end_) {
00340             more();
00341         }
00342         *next_++ = c;
00343     }
00344 
00348     void writeBytes(const uint8_t* b, size_t n) {
00349         while (n > 0) {
00350             if (next_ == end_) {
00351                 more();
00352             }
00353             size_t q = end_ - next_;
00354             if (q > n) {
00355                 q = n;
00356             }
00357             ::memcpy(next_, b, q);
00358             next_ += q;
00359             b += q;
00360             n -= q;
00361         }
00362     }
00363 
00368     void flush() {
00369         if (next_ != end_) {
00370             out_->backup(end_ - next_);
00371             next_ = end_;
00372         }
00373         out_->flush();
00374     }
00375 
00379     void more() {
00380         size_t n = 0;
00381         while (out_->next(&next_, &n)) {
00382             if (n != 0) {
00383                 end_ = next_ + n;
00384                 return;
00385             }
00386         }
00387         throw Exception("EOF reached");
00388     }
00389 
00390 };
00391 
00396 inline void copy(InputStream& in, OutputStream& out)
00397 {
00398     const uint8_t *p = 0;
00399     size_t n = 0;
00400     StreamWriter w(out);
00401     while (in.next(&p, &n)) {
00402         w.writeBytes(p, n);
00403     }
00404     w.flush();
00405 }
00406 
00407 }   // namespace avro
00408 #endif
00409 
00410