Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of.
For applications that need a global view of all data we use the mirror maker tool to provide clusters which have aggregate data mirrored from all datacenters. These aggregator clusters are used for reads by applications that require this.
Likewise in order to support data load into Hadoop which resides in separate facilities we provide local read-only clusters that mirror the production data centers in the facilities where this data load occurs.
This allows each facility to stand alone and operate even if the inter-datacenter links are unavailable: when this occurs the mirroring falls behind until the link is restored at which time it catches up.
This deployment pattern allows datacenters to act as independent entities and allows us to manage and tune inter-datacenter replication centrally.
This is not the only possible deployment pattern. It is possible to read from or write to a remote Kafka cluster over the WAN though TCP tuning will be necessary for high-latency links.
It is generally not advisable to run a single Kafka cluster that spans multiple datacenters as this will incur very high replication latency both for Kafka writes and Zookeeper writes and neither Kafka nor Zookeeper will remain available if the network partitions.
All configurations are documented in the configuration section.
# Replication configurations num.replica.fetchers=4 replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 controller.socket.timeout.ms=30000 controller.message.queue.size=10 # Log configuration num.partitions=8 message.max.bytes=1000000 auto.create.topics.enable=true log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=168 log.flush.interval.ms=10000 log.flush.interval.messages=20000 log.flush.scheduler.interval.ms=2000 log.roll.hours=168 log.retention.check.interval.ms=300000 log.segment.bytes=1073741824 # ZK configuration zk.connection.timeout.ms=6000 zk.sync.time.ms=2000 # Socket server configuration num.io.threads=8 num.network.threads=8 socket.request.max.bytes=104857600 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=16 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100Our client configuration varies a fair amount between different use cases.
java -server -Xms3072m -Xmx3072m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSConcurrentMTEnabled -XX:+CMSScavengeBeforeRemark -XX:CMSInitiatingOccupancyFraction=30 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:logs/gc.log -Djava.awt.headless=true -Dcom.sun.management.jmxremote -classpath <long list of jars> the.actual.Class
You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30.
The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better).
We have seen a few issues running on Windows and Windows is not currently a well supported platform though we would be happy to change that.
You likely don't need to do much OS-level tuning though there are a few things that will help performance.
Two configurations that may be important:
If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.
RAID can potentially do better at balancing load between disks (although it doesn't always seem to) because it balances load at a lower level. The primary downside of RAID is that it is usually a big performance hit for write throughput and reduces the available disk space.
Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement.
Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync'd Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup.
This frequency of application-level fsyncs has a large impact on both latency and throughput. Setting a large flush interval will improve throughput as the operating system can buffer the many small writes into a single large write. This works effectively even across many partitions all taking simultaneous writes provided enough memory is available for buffering. However doing this may have a significant impact on latency as in many filesystems (including ext2, ext3, and ext4) fsync is an operation which blocks all writes to the file. Because of this allowing lots of data to accumulate and then calling flush can lead to large write latencies as new writes on that partition will be blocked as lots of accumulated data is flushed to disk.
In 0.8 we support replication as a way to ensure that data that is written is durable in the face of server crashes. As a result we allow giving out data to consumers immediately and the flush interval does not impact consumer latency. However we still MUST flush each log segment when the log rolls over to a new segment. So although you can set a relatively lenient flush interval setting no flush interval at all will lead to a full segment's worth of data being flushed all at once which can be quite slow. After 0.8 we improved our recovery procedure which allows us to avoid the blocking fsync when the log rolls. As a result in all releases after 0.8 we recommend using replication and not setting any application level flush settings---relying only on the OS and Kafka's own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.
In general you don't need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful.
Pdflush has a configurable policy that controls how much dirty data can be maintained in cache and for how long before it must be written back to disk. This policy is described here. When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.
You can see the current state of OS memory usage by doing
cat /proc/meminfoThe meaning of these values are described in the link above.
Using pagecache has several advantages over an in-process cache for storing data that will be written out to disk:
It is not necessary to tune these settings, however those wanting to optimize performance have a few knobs that will help:
The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX.
We pay particular we do graphing and alerting on the following metrics:
Description | Mbean name | Normal value |
---|---|---|
Message in rate | "kafka.server":name="AllTopicsMessagesInPerSec", type="BrokerTopicMetrics" | |
Byte in rate | "kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics" | |
Request rate | "kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics" | |
Byte out rate | "kafka.server":name="AllTopicsBytesOutPerSec", type="BrokerTopicMetrics" | |
Log flush rate and time | "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats" | |
# of under replicated partitions (|ISR| < |all replicas|) | "kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" | 0 |
Is controller active on broker | "kafka.controller":name="ActiveControllerCount",type="KafkaController" | only one broker in the cluster should have 1 |
Leader election rate | "kafka.controller":name="LeaderElectionRateAndTimeMs", type="ControllerStats" | non-zero when there are broker failures |
Unclean leader election rate | "kafka.controller":name="UncleanLeaderElectionsPerSec", type="ControllerStats" | 0 |
Partition counts | "kafka.server":name="PartitionCount",type="ReplicaManager" | mostly even across brokers |
Leader replica counts | "kafka.server":name="LeaderCount",type="ReplicaManager" | mostly even across brokers |
ISR shrink rate | "kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" | If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0. |
ISR expansion rate | "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" | See above |
Max lag in messages btw follower and leader replicas | "kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager" | < replica.lag.max.messages |
Lag in messages per follower replica | "kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics" | < replica.lag.max.messages |
Requests waiting in the producer purgatory | "kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory" | non-zero if ack=-1 is used |
Requests waiting in the fetch purgatory | "kafka.server":name="PurgatorySize",type="FetchRequestPurgatory" | size depends on fetch.wait.max.ms in the consumer |
Request total time | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics" | broken into queue, local, remote and response send time |
Time the request waiting in the request queue | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics" | |
Time the request being processed at the leader | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics" | |
Time the request waits for the follower | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics" | non-zero for produce requests when ack=-1 |
Time to send the response | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics" | |
Number of messages the consumer lags behind the broker among all partitions consumed | "kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager" | small and not growing |
The min fetch rate among all fetchers to brokers in a consumer | "kafka.consumer":name="([-.\w]+)-MinFetch",type="ConsumerFetcherManager" | >= 1000/fetch.wait.max.ms |
Redundancy in the physical/hardware/network layout: try not to put them all in the same rack, decent (but don't go nuts) hardware, try to keep redundant power and network paths, etc
I/O segregation: if you do a lot of write type traffic you'll almost definitely want the transaction logs on a different disk group than application logs and snapshots (the write to the Zookeeper service has a synchronous write to disk, which can be slow).
Application segregation: Unless you really understand the application patterns of other apps that you want to install on the same box, it can be a good idea to run Zookeeper in isolation (though this can be a balancing act with the capabilities of the hardware). Use care with virtualization: It can work, depending on your cluster layout and read/write patterns and SLAs, but the tiny overheads introduced by the virtualization layer can add up and throw off Zookeeper, as it can be very time sensitive
Zookeeper configuration and monitoring: It's java, make sure you give it 'enough' heap space (We usually run them with 3-5G, but that's mostly due to the data set size we have here). Unfortunately we don't have a good formula for it. As far as monitoring, both JMX and the 4 letter words (4lw) commands are very useful, they do overlap in some cases (and in those cases we prefer the 4 letter commands, they seem more predictable, or at the very least, they work better with the LI monitoring infrastructure) Don't overbuild the cluster: large clusters, especially in a write heavy usage pattern, means a lot of intracluster communication (quorums on the writes and subsequent cluster member updates), but don't underbuild it (and risk swamping the cluster).
Try to run on a 3-5 node cluster: Zookeeper writes use quorums and inherently that means having an odd number of machines in a cluster. Remember that a 5 node cluster will cause writes to slow down compared to a 3 node cluster, but will allow more fault tolerance.
Overall, we try to keep the Zookeeper system as small as will handle the load (plus standard growth capacity planning) and as simple as possible. We try not to do anything fancy with the configuration or application layout as compared to the official release as well as keep it as self contained as possible. For these reasons, we tend to skip the OS packaged versions, since it has a tendency to try to put things in the OS standard hierarchy, which can be 'messy', for want of a better way to word it.