* @copyright 2011 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ */ /** * Simple Kafka Producer * * @category Libraries * @package Kafka * @author Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @link http://sna-projects.com/kafka/ */ class Kafka_Producer { /** * @var integer */ protected $request_key; /** * @var resource */ protected $conn; /** * @var string */ protected $host; /** * @var integer */ protected $port; /** * @var integer */ protected $compression; /** * Constructor * * @param integer $host Host * @param integer $port Port */ public function __construct($host, $port) { $this->request_key = 0; $this->host = $host; $this->port = $port; $this->compression = 0; } /** * Connect to Kafka via a socket * * @return void * @throws RuntimeException */ public function connect() { if (!is_resource($this->conn)) { $this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); } if (!is_resource($this->conn)) { throw new RuntimeException('Cannot connect to Kafka: ' . $errstr, $errno); } } /** * Close the socket * * @return void */ public function close() { if (is_resource($this->conn)) { fclose($this->conn); } } /** * Send messages to Kafka * * @param array $messages Messages to send * @param string $topic Topic * @param integer $partition Partition * * @return boolean */ public function send(array $messages, $topic, $partition = 0xFFFFFFFF) { $this->connect(); return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression)); } /** * When serializing, close the socket and save the connection parameters * so it can connect again * * @return array Properties to save */ public function __sleep() { $this->close(); return array('request_key', 'host', 'port'); } /** * Restore parameters on unserialize * * @return void */ public function __wakeup() { } }