public class KafkaConsumer<K,V> extends java.lang.Object implements Consumer<K,V>
It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of consumers to load balance consumption using consumer groups (as described below).
The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to. Failure to close the consumer after use will leak these connections.
The consumer is thread safe but generally will be used only from within a single thread. The consumer client has no threads of it's own, all work is done in the caller's thread when calls are made on the various methods exposed.
The position
of the consumer gives the offset of the next record that will be given
out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
every time the consumer receives data calls poll(long)
and receives messages.
The committed position
is the last offset that has been saved securely. Should the
process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
offsets periodically, or it can choose to control this committed position manually by calling
commit
.
This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.
Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions. This group membership is maintained dynamically: if a process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new process joins the group, partitions will be moved from existing consumers to this new process.
So if two processes subscribe to a topic both specifying different groups they will each get all the records in that topic; if they both specify the same group they will each get about half the records.
Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a given topic without duplicating data (additional consumers are actually quite cheap).
This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to a queue in a traditional messaging system all processes would be part of a single consumer group and hence record delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would have it's own consumer group, so each process would subscribe to all the records published to the topic.
In addition, when offsets are committed they are always committed for a given consumer group.
It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic partition balancing.
Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe("foo", "bar"); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }Setting
enable.auto.commit
means that offsets are committed automatically with a frequency controlled by
the config auto.commit.interval.ms
.
The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
configuration metadata.broker.list
. This list is just used to discover the rest of the brokers in the
cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
case there are servers down when the client is connecting).
In this example the client is subscribing to the topics foo and bar as part of a group of consumers called test as described above.
The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The
consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
to it. If it stops heartbeating for a period of time longer than session.timeout.ms
then it will be
considered dead and it's partitions will be assigned to another process.
The serializers settings specify how to turn the objects the user provides into bytes. By specifying the string serializers we are saying that our record's key and value will just be simple strings.
Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe("foo", "bar"); int commitInterval = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); if (buffer.size() >= commitInterval) { insertIntoDb(buffer); consumer.commit(CommitType.SYNC); buffer.clear(); } } }
In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt will be made to rebalance partitions to other instances.
There are several cases where this makes sense:
This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular partitions:
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.subscribe(partition0); consumer.subscribe(partition1);The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load balancing) using the same consumer instance.
Here are a couple of examples of this type of usage:
enable.auto.commit=false
ConsumerRecord
to save your position.
seek(TopicPartition, long)
.
rebalance.callback.class
, which specifies an implementation of the interface
ConsumerRebalanceCallback
. When partitions are taken from a consumer the consumer will want to commit its
offset for those partitions by implementing
ConsumerRebalanceCallback.onPartitionsRevoked(Consumer, Collection)
. When partitions are assigned to a
consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
to that position by implementing ConsumerRebalanceCallback.onPartitionsAssigned(Consumer, Collection)
.
Another common use for ConsumerRebalanceCallback
is to flush any caches the application maintains for
partitions that are moved elsewhere.
There are several instances where manually controlling the consumer's position can be useful.
One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not attempt to catch up processing all records, but rather just skip to the most recent records.
Another use case is for a system that maintains local state as described in the previous section. In such a system
the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
Kafka allows specifying the position using seek(TopicPartition, long)
to specify the new position. Special
methods for seeking to the earliest and latest offset the server maintains are also available (
seekToBeginning(TopicPartition...)
and seekToEnd(TopicPartition...)
respectively).
This leaves several options for implementing multi-threaded processing of records.
ConsumerRecords
instances to a blocking queue consumed by a pool of processor threads that actually handle
the record processing.
This option likewise has pros and cons:
Constructor and Description |
---|
KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
A consumer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs,
ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
A consumer is instantiated by providing a set of key-value pairs as configuration, a
ConsumerRebalanceCallback implementation, a key and a value Deserializer . |
KafkaConsumer(java.util.Properties properties)
A consumer is instantiated by providing a
Properties object as configuration. |
KafkaConsumer(java.util.Properties properties,
ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
A consumer is instantiated by providing a
Properties object as configuration and a
ConsumerRebalanceCallback implementation, a key and a value Deserializer . |
Modifier and Type | Method and Description |
---|---|
void |
close() |
void |
commit(CommitType commitType)
Commits offsets returned on the last
poll() for the subscribed list of topics and partitions. |
void |
commit(java.util.Map<TopicPartition,java.lang.Long> offsets,
CommitType commitType)
Commits the specified offsets for the specified list of topics and partitions to Kafka.
|
long |
committed(TopicPartition partition)
Fetches the last committed offset for the given partition (whether the commit happened by this process or
another).
|
java.util.Map<MetricName,? extends Metric> |
metrics()
Get the metrics kept by the consumer
|
java.util.List<PartitionInfo> |
partitionsFor(java.lang.String topic)
Get metadata about the partitions for a given topic.
|
ConsumerRecords<K,V> |
poll(long timeout)
Fetches data for the topics or partitions specified using one of the subscribe APIs.
|
long |
position(TopicPartition partition)
Returns the offset of the next record that will be fetched (if a record with that offset exists).
|
void |
seek(TopicPartition partition,
long offset)
Overrides the fetch offsets that the consumer will use on the next
poll(timeout) . |
void |
seekToBeginning(TopicPartition... partitions)
Seek to the first offset for each of the given partitions
|
void |
seekToEnd(TopicPartition... partitions)
Seek to the last offset for each of the given partitions
|
void |
subscribe(java.lang.String... topics)
Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
|
void |
subscribe(TopicPartition... partitions)
Incrementally subscribes to a specific topic partition and does not use the consumer's group management
functionality.
|
java.util.Set<TopicPartition> |
subscriptions()
The set of partitions currently assigned to this consumer.
|
void |
unsubscribe(java.lang.String... topics)
Unsubscribe from the specific topics.
|
void |
unsubscribe(TopicPartition... partitions)
Unsubscribe from the specific topic partitions.
|
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
Valid configuration strings are documented at ConsumerConfig
configs
- The consumer configspublic KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
ConsumerRebalanceCallback
implementation, a key and a value Deserializer
.
Valid configuration strings are documented at ConsumerConfig
configs
- The consumer configscallback
- A callback interface that the user can implement to manage customized offsets on the start and
end of every rebalance operation.keyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.public KafkaConsumer(java.util.Properties properties)
Properties
object as configuration. Valid
configuration strings are documented at ConsumerConfig
A consumer is instantiated by providing a
Properties
object as configuration. Valid configuration strings are documented at
ConsumerConfig
public KafkaConsumer(java.util.Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Properties
object as configuration and a
ConsumerRebalanceCallback
implementation, a key and a value Deserializer
.
Valid configuration strings are documented at ConsumerConfig
properties
- The consumer configuration propertiescallback
- A callback interface that the user can implement to manage customized offsets on the start and
end of every rebalance operation.keyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.public java.util.Set<TopicPartition> subscriptions()
subscribe(TopicPartition...)
then this will simply return the list of partitions that
were subscribed to. If subscription was done by specifying only the topic using subscribe(String...)
then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
hasn't happened yet, or the partitions are in the process of getting reassigned).subscriptions
in interface Consumer<K,V>
subscriptions()
public void subscribe(java.lang.String... topics)
As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if one of the following events trigger -
subscribe
in interface Consumer<K,V>
topics
- A variable list of topics that the consumer wants to subscribe tosubscribe(String...)
public void subscribe(TopicPartition... partitions)
subscribe
in interface Consumer<K,V>
partitions
- Partitions to incrementally subscribe tosubscribe(TopicPartition...)
public void unsubscribe(java.lang.String... topics)
poll()
onwardsunsubscribe
in interface Consumer<K,V>
topics
- Topics to unsubscribe fromunsubscribe(String...)
public void unsubscribe(TopicPartition... partitions)
poll()
onwardsunsubscribe
in interface Consumer<K,V>
partitions
- Partitions to unsubscribe fromunsubscribe(TopicPartition...)
public ConsumerRecords<K,V> poll(long timeout)
The offset used for fetching the data is governed by whether or not seek(TopicPartition, long)
is used.
If seek(TopicPartition, long)
is used, it will use the specified offsets on startup and on every
rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
offset using commit(offsets, sync)
for the subscribed list of partitions.
poll
in interface Consumer<K,V>
timeout
- The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits
indefinitely. Must not be negativeNoOffsetForPartitionException
- If there is no stored offset for a subscribed partition and no automatic
offset reset policy has been configured.poll(long)
public void commit(java.util.Map<TopicPartition,java.lang.Long> offsets, CommitType commitType)
This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until the commit succeeds.
commit
in interface Consumer<K,V>
offsets
- The list of offsets per partition that should be committed to Kafka.commitType
- Control whether the commit is blockingcommit(Map, CommitType)
public void commit(CommitType commitType)
poll()
for the subscribed list of topics and partitions.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
commit
in interface Consumer<K,V>
commitType
- Whether or not the commit should block until it is acknowledged.commit(CommitType)
public void seek(TopicPartition partition, long offset)
poll(timeout)
. If this API
is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsetsseek
in interface Consumer<K,V>
seek(TopicPartition, long)
public void seekToBeginning(TopicPartition... partitions)
seekToBeginning
in interface Consumer<K,V>
seekToBeginning(TopicPartition...)
public void seekToEnd(TopicPartition... partitions)
seekToEnd
in interface Consumer<K,V>
seekToEnd(TopicPartition...)
public long position(TopicPartition partition)
position
in interface Consumer<K,V>
partition
- The partition to get the position forNoOffsetForPartitionException
- If a position hasn't been set for a given partition, and no reset policy is
available.position(TopicPartition)
public long committed(TopicPartition partition)
This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the consumer hasn't yet initialized it's cache of committed offsets.
committed
in interface Consumer<K,V>
partition
- The partition to checkNoOffsetForPartitionException
- If no offset has ever been committed by any process for the given
partition.committed(TopicPartition)
public java.util.Map<MetricName,? extends Metric> metrics()
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
partitionsFor
in interface Consumer<K,V>
topic
- The topic to get partition metadata forpartitionsFor(String)