00001
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 #include "boost/utility.hpp"
00026 #include "Exception.hh"
00027
00028 namespace avro {
00029 class InputStream : boost::noncopyable {
00030 public:
00031 InputStream() { }
00032 virtual ~InputStream() { }
00033
00040 virtual bool next(const uint8_t** data, size_t* len) = 0;
00041
00047 virtual void backup(size_t len) = 0;
00048
00052 virtual void skip(size_t len) = 0;
00053
00059 virtual size_t byteCount() const = 0;
00060 };
00061
00062 class OutputStream : boost::noncopyable {
00063 public:
00064 OutputStream() { }
00065 virtual ~OutputStream() { }
00066
00072 virtual bool next(uint8_t** data, size_t* len) = 0;
00073
00078 virtual void backup(size_t len) = 0;
00079
00085 virtual uint64_t byteCount() const = 0;
00086
00091 virtual void flush() = 0;
00092 };
00093
00097 std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00098
00104 std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00105
00113 std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00114
00122 std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00123 size_t bufferSize = 8 * 1024);
00124
00129 std::auto_ptr<InputStream> fileInputStream(const char* filename,
00130 size_t bufferSize = 8 * 1024);
00131
00133 struct StreamReader {
00134 InputStream* in_;
00135 const uint8_t* next_;
00136 const uint8_t* end_;
00137
00138 StreamReader() : in_(0), next_(0), end_(0) { }
00139 StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00140
00141 void reset(InputStream& is) {
00142 if (in_ != 0) {
00143 in_->backup(end_ - next_);
00144 }
00145 in_ = &is;
00146 next_ = end_ = 0;
00147 }
00148
00149 uint8_t read() {
00150 if (next_ == end_) {
00151 more();
00152 }
00153 return *next_++;
00154 }
00155
00156 void readBytes(uint8_t* b, size_t n) {
00157 while (n > 0) {
00158 if (next_ == end_) {
00159 more();
00160 }
00161 size_t q = end_ - next_;
00162 if (q > n) {
00163 q = n;
00164 }
00165 ::memcpy(b, next_, q);
00166 next_ += q;
00167 b += q;
00168 n -= q;
00169 }
00170 }
00171
00172 void skipBytes(size_t n) {
00173 if (n > (end_ - next_)) {
00174 n -= end_ - next_;
00175 next_ = end_;
00176 in_->skip(n);
00177 } else {
00178 next_ += n;
00179 }
00180 }
00181
00182 bool fill() {
00183 size_t n = 0;
00184 while (in_->next(&next_, &n)) {
00185 if (n != 0) {
00186 end_ = next_ + n;
00187 return true;
00188 }
00189 }
00190 return false;
00191 }
00192
00193 void more() {
00194 if (! fill()) {
00195 throw Exception("EOF reached");
00196 }
00197 }
00198
00199 bool hasMore() {
00200 return (next_ == end_) ? fill() : true;
00201 }
00202 };
00203
00207 struct StreamWriter {
00208 OutputStream* out_;
00209 uint8_t* next_;
00210 uint8_t* end_;
00211
00212 StreamWriter() : out_(0), next_(0), end_(0) { }
00213 StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00214
00215 void reset(OutputStream& os) {
00216 if (out_ != 0) {
00217 out_->backup(end_ - next_);
00218 }
00219 out_ = &os;
00220 next_ = end_;
00221 }
00222
00223 void write(uint8_t c) {
00224 if (next_ == end_) {
00225 more();
00226 }
00227 *next_++ = c;
00228 }
00229
00230 void writeBytes(const uint8_t* b, size_t n) {
00231 while (n > 0) {
00232 if (next_ == end_) {
00233 more();
00234 }
00235 size_t q = end_ - next_;
00236 if (q > n) {
00237 q = n;
00238 }
00239 ::memcpy(next_, b, q);
00240 next_ += q;
00241 b += q;
00242 n -= q;
00243 }
00244 }
00245
00246 void more() {
00247 size_t n = 0;
00248 while (out_->next(&next_, &n)) {
00249 if (n != 0) {
00250 end_ = next_ + n;
00251 return;
00252 }
00253 }
00254 throw Exception("EOF reached");
00255 }
00256
00257 void flush() {
00258 if (next_ != end_) {
00259 out_->backup(end_ - next_);
00260 next_ = end_;
00261 }
00262 out_->flush();
00263 }
00264 };
00265
00270 inline void copy(InputStream& in, OutputStream& out)
00271 {
00272 const uint8_t *p = 0;
00273 size_t n = 0;
00274 StreamWriter w(out);
00275 while (in.next(&p, &n)) {
00276 w.writeBytes(p, n);
00277 }
00278 w.flush();
00279 }
00280
00281 }
00282 #endif
00283
00284