val consumer = new Consumer(new ConsumerConfig(...))
// begin consumption of two topics
val topic1: MessageStream = consumer.consume("my_topic_1")
val topic2: MessageStream = consumer.consume("my_topic_2")
// process messages from topic1
for(message <- topic1) {
// process the message
}
// record the processing of all messages
topic1.commit()
// close topic1
topic1.close()
// close all topics
consumer.close()
The consumer fetcher is a background thread that holds the connections to the kafka servers and fetches the data for consumption. It is responsible for throttling its requests to the servers.
Fetched data is put into a queue of unconsumed data for consumption by one of the ConsumerIterators.
The ConsumerFetcher is responsible for registration with zookeeper and for balancing its consumption with other consumers.
The ConsumerFetcher maintains a PartitionRepository which stores an entry for each topic/partition entry as well as the current consumed offset as a PartitionOffset object. The fetcher thread adds FetchedDataChunk objects which contain the PartitionOffset as well as the fetched MessageSet.
The ConsumerFetcher exposes a commit(topic: String) call, which will write all PartitionOffset data back to zookeeper.
val fetcher = new ConsumerFetcher(new ConsumerConfig(...)) fetcher.connect() val stream: KafkaMessageStream = fetcher.consume("my_topic") for(m <- stream) { process(m) } stream.commit()