* @copyright 2011 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ */ /** * A sequence of messages stored in a byte buffer * * @category Libraries * @package Kafka * @author Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @link http://sna-projects.com/kafka/ */ class Kafka_MessageSet implements Iterator { /** * @var integer */ protected $validByteCount = 0; /** * @var boolean */ private $valid = false; /** * @var array */ private $array = array(); /** * Constructor * * @param resource $stream Stream resource * @param integer $errorCode Error code */ public function __construct($stream, $errorCode = 0) { $data = stream_get_contents($stream); $len = strlen($data); $ptr = 0; while ($ptr <= ($len - 4)) { $size = array_shift(unpack('N', substr($data, $ptr, 4))); $ptr += 4; $this->array[] = new Kafka_Message(substr($data, $ptr, $size)); $ptr += $size; $this->validByteCount += 4 + $size; } fclose($stream); } /** * Get message set size in bytes * * @return integer */ public function validBytes() { return $this->validByteCount; } /** * Get message set size in bytes * * @return integer */ public function sizeInBytes() { return $this->validBytes(); } /** * next * * @return void */ public function next() { $this->valid = (FALSE !== next($this->array)); } /** * valid * * @return boolean */ public function valid() { return $this->valid; } /** * key * * @return integer */ public function key() { return key($this->array); } /** * current * * @return Kafka_Message */ public function current() { return current($this->array); } /** * rewind * * @return void */ public function rewind() { $this->valid = (FALSE !== reset($this->array)); } }