* @copyright 2011 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ */ /** * Send a request to Kafka * * @category Libraries * @package Kafka * @author Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @link http://sna-projects.com/kafka/ */ class Kafka_BoundedByteBuffer_Send { /** * @var integer */ protected $size; /** * @var boolean */ protected $sizeWritten = false; /** * @var string resource */ protected $buffer; /** * @var boolean */ protected $complete = false; /** * Constructor * * @param Kafka_FetchRequest $req Request object */ public function __construct(Kafka_FetchRequest $req) { $this->size = $req->sizeInBytes() + 2; $this->buffer = fopen('php://temp', 'w+b'); fwrite($this->buffer, pack('n', $req->id)); $req->writeTo($this->buffer); rewind($this->buffer); //fseek($this->buffer, $req->getOffset(), SEEK_SET); } /** * Try to write the request size if we haven't already * * @param resource $stream Stream resource * * @return integer Number of bytes read * @throws RuntimeException when size is <=0 or >= $maxSize */ private function writeRequestSize($stream) { if (!$this->sizeWritten) { if (!fwrite($stream, pack('N', $this->size))) { throw new RuntimeException('Cannot write request to stream (' . error_get_last() . ')'); } $this->sizeWritten = true; return 4; } return 0; } /** * Write a chunk of data to the stream * * @param resource $stream Stream resource * * @return integer number of written bytes * @throws RuntimeException */ public function writeTo($stream) { // have we written the request size yet? $written = $this->writeRequestSize($stream); // try to write the actual buffer itself if ($this->sizeWritten && !feof($this->buffer)) { //TODO: check that fread returns something $written += fwrite($stream, fread($this->buffer, 8192)); } // if we are done, mark it off if (feof($this->buffer)) { $this->complete = true; fclose($this->buffer); } return $written; } /** * Write the entire request to the stream * * @param resource $stream Stream resource * * @return integer number of written bytes */ public function writeCompletely($stream) { $written = 0; while (!$this->complete) { $written += $this->writeTo($stream); } //echo "\nWritten " . $written . ' bytes '; return $written; } }