Kafka

Kafka is publish-subscribe high-throughput distributed messaging system. A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of coordinated consumers. Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. Distributed by Design – Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.

Configure Kafka

Install Kafka

Kafka is distributed by CDH and requires the following services:

  • Kafka Broker – Add the Kafka Broker service to a node with plenty of memory and CPU.
    • Note: On installation you will configure the Kafka broker Default Group: change the Kafka Data directory logs.dirs from /var/local/kafka/data to /space#/kafka/data (with n number of volumes). See the Kafka configuration section below for more settings.
  • Kafka MirrorMaker – Does not need to be installed onto the same node as the Broker. Is not needed for every installation – a common usage of the MirrorMaker is to move data between Data Centers.

How to install Kafka

  1. Install Kafka Parcel
    1. In Cloudera Manager, click on Parcels.
    2. Find the Kafka Parcel, click Distribute.
    3. Click Activate, but do not restart the cluster.
  2. Install Kafka Service
    1. In Cloudera Manager, click on the cluster name.
    2. In the upper right hand corner, click on Actions, and Add a New Service.
    3. Select Kafka in the list.
    4. Follow the instructions and configure Kafka using the below list.

Kafka Configuration

Configure Kafka as follows:

Configuration Single Node Cluster (1 server) Cluster (more than one server) Description
Zookeeper Root ZNode in zookeeper that should be used as a root for this Kafka cluster.

LEAVE BLANK!

Replication Factor

default.replication.factor

1 2 The default replication factor for automatically created topics. A higher replication factor means we can restart a Broker without a loss of connectivity to the data. If there are two Brokers, recommend a Replication factor of 2, if there are more than two Brokers, recommend a Replication factor of 3.

Note: For a cluster, use 2, for single node cluster, use 1.

Default Number of Partitions

num.partitions

2 3 The default number of partitions per topic. It is recommended that the partition count is higher than the number of Brokers, so that the Leader Partitions are evenly distributed among Brokers thus distributing the read/write load.

Note: If there are a large number of partitions, make sure Kafka starts with sufficient heap space (number of partitions * replica.fetch.max.bytes).

For a cluster, use 3, for a single node cluster, use 2.

Zookeeper Session Timeout
zookeeper.session.timeout.ms
6 seconds 10 seconds Zookeeper session timeout. If the consumer fails to heartbeat to zookeeper for this period of time it is considered dead and a rebalance will occur.
Replica Fetch Size
replica.fetch.max.bytes
1 MB 12 MB The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
Note: As of 12/8/15, the network map offers files that are 8.8 MB (150,000 row) files.
Data Directory

log.dirs

/space1/kafka/data /space1/kafka/data

/space2/kafka/data

/space3/kafka/data

/space4/kafka/data

/space5/kafka/data

/space6/kafka/data

/space7/kafka/data

A list of one or more directories in which Kafka data is stored. Each new partition that is created will be placed in the directory which currently has the fewest partitions. Each directory should be on its own separate drive.
Java Heap Size of Broker in Megabytes

broker_max_heap_size

256 MB 512 MB to 2 GB Maximum size for the Java process heap memory. Passed to Java -Xmx. Measured in megabytes.

Note: Not enough time has been spent to learn the correct setting for heap.

Maximum Connections per IP Address
max.connections.per.ip
100 100 to 10000
Maximum number of connections allowed from each ip address.
forceStartOffsetTime forceStartOffsetTime value can be -2, -1, or a time stamp in milliseconds

-1 to read the latest offset of the topic

-2 to read from the beginning.

timestamp to read from a specific time

NO CHANGE, LEAVE BLANK.

Automatically Restart Process

Kafka MirrorMaker Default Group
Kafka Broker Default Group

true
true
true
true
When set, this role’s process is automatically (and transparently) restarted in the event of an unexpected failure.

Note: Set this for true for each Kafka MirrorMaker service. 

Upgrade Kafka to 0.9.x

Using Kafka CDH Parcels, it is easy to upgrade Kafka. Use CM to distribute the Kafka CDH Parcel.

  1. By default, the new version of the Kafka broker binds to only the hostname interface instead of all. This breaks any environments that need to contact the Kafka Brokers via localhost for the hostname. We’ll need to add to the Kafka Broker Advanced Configuration Snippet (Safety Valve) for kafka.properties:
    1. listeners=PLAINTEXT://0.0.0.0:9092
  2. Producers must define bootstrap.servers
  3. New style consumers must define bootstrap.servers otherwise old style consumers can continue to use zookeeper.connect

Web Service Buffering

An important design consideration for a web service using Kafka is to build in a buffering to handle unexpected Kafka service failure or expected maintenance. Bruce is one example:

https://techbeacon.com/when-kafka-met-bruce-how-one-service-can-boost-app-messaging-reliability

https://github.com/ifwe/bruce

Administer Kafka

Kafka Command-line Tools

Important Kafka command-line tools are located in /usr/bin, others are located in /opt/cloudera/parcels/KAFKA-0.8.2.0-1.kafka1.3.1.p0.9/lib/kafka/bin/:

Topics

kafka-topics

Create, alter, list, and describe topics. For example:

To create: kafka-topics –create –topic testtopic  –partition 1 –replication-factor 1 –zookeeper zk.servername01/kafka

To list: kafka-topics –list –zookeeper zk.servername01/kafka

sink1

t1

t2

Describe

Describe a topic:

kafka-topics –zookeeper zk.servername01:2181/kafka –describe –topic the_name_of_topic

Or:

kafka-topics –zookeeper servername01 –describe –topic the_name_of_topic

Consume

Kafka Consumer using ZooKeeper to handle offsets:

kafka-console-consumer

Read data from a Kafka topic and write it to standard output. For example:

kafka-console-consumer –zookeeper zk.servername01/kafka –topic topic_name

kafka-console-consumer –zookeeper servername01:2181 –consumer.config consumer.config –topic topic_name

Use the new Kafka Consumer using Kafka to handle offsets:

/usr/bin/kafka-console-consumer –new-consumer –bootstrap-server kafka.broker.servername01:9092 –topic topic_name –max-messages 10 –from-beginning

 

Limit the number of records you consume with max-messages:

–max-messages 10

Consume from the beginning with from-beginning:

–from-beginning

Produce

kafka-console-producer

Read data from standard output and write it to a Kafka topic. For example:

kafka-console-producer –broker-list kafka.broker.servername01:9092 –topic topic_name

Get Offsets

GetOffsetShell

It can be crucial to know the most recent (latest) offsets for your topic. Use the GetOffsetShell class:

/usr/bin/kafka-run-class kafka.tools.GetOffsetShell –broker-list kafka.broker.servername01:9092,kafka.broker.servername02:9092 –topic topic_name –time -1

Note: 

–time <Long: timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)>

ConsumerOffsetChecker

Look at offsets:

/usr/bin/kafka-run-class kafka.tools.ConsumerOffsetChecker –group cloudera_mirrormaker –zookeeper zk.servername01

kafka-consumer-offset-checker

Check the number of messages read and written, as well as the lag for each consumer in a specific consumer group. For example:

kafka-consumer-offset-checker –group flume –topic topic_name –zookeeper zk.servername01

Note: forceStartOffsetTime value can be -2, -1, or a time stamp in milliseconds:

   -1 to read the latest offset of the topic

   -2 to read from the beginning.

   timestamp to read from a specific time

Get a list of Kafka consumer groups

If you’re using a console consumer, it will try to use a random(ish) consumer group ID unless you specifically tell it otherwise. You can do this, however, by creating a consumer.properties file and putting a group.id=(some name) property into it, and then running the consumer with the –consumer.config=consumer.properties command line argument.  For now, feel free to make up your own consumer group name, and then you can use that group name to later do the query that you want to do.

http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCAGTt3W-shaX2t0-W-tC0YM0Q_BiXyXxei0QpXG=opsqiqXvehQ@mail.gmail.com%3E

There’s probably a better way to go about it, but you can use the Zookeeper client to get a list of existing consumer groups, under /consumers.

 

Change the Log Retention Time

For any existing topics, we can always change the log retention time. It takes a few minutes to take effect, so it’s best to do this well before we encounter an outage situation where this suddenly becomes extremely important.

Get current retention time for a topic:

kafka-topics –zookeeper zk.servername01:2181 –describe –topic topic_name

Change the log retention to 7 days on one topic:

kafka-topics –zookeeper zk.servername01:2181 –alter –topic topic_name –config retention.ms=604800000

Here’s a way to change the log retention to 7 days for every topic in a cluster, from that cluster’s app server:

# CHANGE THIS FOR EACH ENVIRONMENT
ZOOKEEPER=”zk.servername01:2181″# milliseconds for one week: 604800000
MILLIS_PER_WEEK=604800000kafka-topics –zookeeper $ZOOKEEPER –list | while read -r f; do
kafka-topics –zookeeper $ZOOKEEPER –alter –topic “$f” –config retention.ms=$MILLIS_PER_WEEK
done
Just change the Zookeeper node value and let her rip. It’ll iterate over every existing Kafka topic and set the retention appropriately.

Add Partitions to an Existing Topic

There are times when you need additional partitions.

First describe the topic:

kafka-topics –zookeeper zk.servername01:2181/kafka –describe –topic topic_name

Finally, add additional partitions:

kafka-topics –zookeeper zk.servername01:2181/kafka –alter –topic topic_name –partitions 5

Change the Replication Factor on an Existing Topic

Changing the replication factor on an existing topic can be a little tricky.

Reference: http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

Identify the Broker IDs from a Kafka cluster, for example 160 and 161

Create a json file like this:

{
 "version":1,
 "partitions":[
 {
 "topic":"topic_name",
 "partition":0,"replicas":[160,161]
 },
 {
 "topic":"topic_name",
 "partition":1,"replicas":[161,160]
 },
 {
 "topic":"topic_name",
 "partition":2,"replicas":[160,161]
 }
 ]
}

Then run the script, which on a server with Kafka installed (or a Gateway) is under /opt/cloudera/parcels/KAFKA/lib/kafka/bin:

./kafka-reassign-partitions.sh –zookeeper zk.servername01:2181 –reassignment-json-file increase-replication-factor.json –execute

Then, verify it

./kafka-reassign-partitions.sh –zookeeper zk.servername01:2181 –reassignment-json-file increase-replication-factor.json –verify

And finally, get a description of the topic to make sure it’s right:

kafka-topics –zookeeper zk.servername01:2181/kafka –describe –topic topic_name

Topic: topic_name PartitionCount:3 ReplicationFactor:2   Configs:

Topic: topic_name     Partition: 0    Leader: 161     Replicas: 160,161     Isr: 161,160

Topic: topic_name     Partition: 1    Leader: 161     Replicas: 161,160     Isr: 161,160

Topic: topic_name     Partition: 2    Leader: 160     Replicas: 160,161     Isr: 160,161

Kafka Logs

The Kafka parcel is configured to log all Kafka log messages to a single file, /var/log/kafka/server.log by default. You can view, filter, and search this log using Cloudera Manager.

Reference: http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/PDF/cloudera-kafka.pdfhttp://kafka.apache.org

Delete a Topic

To delete a Kafka topic after the Broker has lost connection to the topic:

Typical delete command:

kafka-topics –zookeeper zk_node:port/chroot –delete –topic topic_name

The topic will be “marked for deletion” and will be removed on the next event. If the delete command doesn’t work right away, try restarting the Kafka service.

More information: Although the command may seem like it deletes topics and returns on success, in fact it creates /admin/delete_topics/<topic> node in zookeeper and only triggers deletion. As soon as broker sees this update, topic no longer accept any new produce/consume request and eventually topic will be deleted. Actual deletion process is kinda complicated and it involves multiple transitions of state machine, and as a result of cleaning up topic’s data that node in zk will be deleted and topic will be deemed dead.

Manually delete the topic:

If the delete commands fails (marked for deletion forever):

1. Using Cloudera Manager, stop the Kafka Broker.

2. Delete the topic from disk. To get the physical location of the topic, you can find the root under Cloudera Manager, Kafka, Configurations, Broker Service, Data Directory.

For example:

sudo mv /space1/kafka/topic_name-0 /space1/kafka/topic_name-0.bak

3. Delete the topic znode from ZooKeeper:

Here are the ZooKeeper commands we used to delete the topic from ZK:

Log onto zkcli:

hbase zkcli

Search for topics:

ls /kafka/brokers/topics

rmr /kafka/brokers/topics/topic_name

ls /kafka/brokers/topics

4. Start Kafka.

5. You can then recreate the Kafka topic.

Kafka Offset Monitor

This is an app to monitor your Kafka consumers and their position (offset) in the queue. We currently do not use this monitor.

  1. Copy the KafkaOffsetMonitor-assembly-0.2.1.jar to /tmp/ on the node.
  2. Start the offset monitor:
  3. Reset permissions on the monitor database:

sudo mkdir -p /opt/kafka/offsetmonitor/

sudo cp /tmp/KafkaOffsetMonitor-assembly-0.2.1.jar /opt/kafka/offsetmonitor/

sudo chmod -R 774 /opt/kafka/offsetmonitor/

cd /opt/kafka/offsetmonitor/

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –zk zk.servername01/kafka –port 8080 –refresh 10.seconds –retain 2.days

 

Reference: https://github.com/quantifind/KafkaOffsetMonitor/blob/master/README.md

Kafka MirrorMaker

Kafka MirrorMaker is used to replicate (called mirroring to avoid using replication) one Kafka cluster to another Kafka cluster. Kafka MirrorMaker ships with CDH.

Mirroring is often used in cross-DC scenarios, and there are a few configuration options that you may want to tune to help deal with inter-DC communication latencies and performance bottlenecks on your specific hardware. In general, you should set a high value for the socket buffer size on the mirror-maker’s consumer configuration (socket.buffersize) and the source cluster’s broker configuration (socket.send.buffer). Also, the mirror-maker consumer’s fetch size (fetch.size) should be higher than the consumer’s socket buffer size. Note that the socket buffer size configurations are a hint to the underlying platform’s networking code. If you enable trace logging, you can check the actual receive buffer size and determine whether the setting in the OS networking layer also needs to be adjusted.

Set up a mirror

Setting up a mirror is easy – simply start up the mirror-maker processes after bringing up the target cluster. At minimum, the mirror maker takes one or more consumer configurations, a producer configuration and either a whitelist or a blacklist. You need to point the consumer to the source cluster’s ZooKeeper, and the producer to the mirror cluster’s ZooKeeper (or use the broker.list parameter).

bin/kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceCluster1Consumer.config –consumer.config sourceCluster2Consumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=”.*”

In Cloudera Manager update the corresponding configurations in the UI and start the MirrorMaker instance.

Reference: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

Check whether a mirror is keeping up

Cloudera publishes health checks: http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/cm_ht_kafka_mirrormaker.html

ConsumerOffsetChecker

The consumer offset checker tool is useful to gauge how well your mirror is keeping up with the source cluster. Note that the –zkconnect argument should point to the source cluster’s ZooKeeper (zk.servername01 in this example). Also, if the topic is not specified, then the tool prints information for all topics under the given consumer group. For example:

/usr/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group KafkaMirror –zookeeper zk.servername01:2181 –topic test-topic
Group        Topic       Pid  Offset          logSize         Lag     Owner
KafkaMirror  test-topic  0    5               5               0       none
KafkaMirror  test-topic  1    3               4               1       none
KafkaMirror  test-topic  2    6               9               3       none

For example:

Check if we are consuming:

kafka-console-consumer –zookeeper zk.servername01:2181 –consumer.config consumer.config –topic topic_name

Check KafkaMirror’s offset:

/usr/bin/kafka-run-class kafka.tools.ConsumerOffsetChecker –group cloudera_mirrormaker –zookeeper zk.servername01:2181 –topic topic_name

This allows you to connect to zookeeper and show various information regarding offsets for that consumer and topic. Example results of running this (with consumer group ‘signatures’ and topic ‘ingest’) are:

Group       Topic   Pid  Offset   logSize  Lag      Owner
signatures  ingest  0    5158355  6655120  1496765  none
signatures  ingest  1    5111118  6604278  1493160  none
signatures  ingest  2    5080952  6571573  1490621  none
signatures  ingest  3    5055358  6543351  1487993  none

The Pid represents the partition id, so in the case above we can see the info for each of the 4 partitions 0 through 3.

The offset tells you the offset currently held in zookeeper for this consumer.

The Lag tells you what the difference is between this offset and the latest message on the topic.

You can use these to check that your consumer is consuming messages faster than they are appearing on the queue, and to dispel any worries you might have that it’s never catching up.

Beware, that the consumer will commit its offset to zookeeper after a certain interval (default 10 seconds), so if you run this command a few times in a row you’ll likely see the offset remain constant whilst lag increases, until a commit from the consumer will suddenly bring the offset up and hence lag down significantly in one go. When we used this, we got scared that we were just falling further and further behind, until we realized about the delay of committal of offsets to zookeeper.

The time period for consumer offsets is configurable in the consumer config, using the “autocommit.interval.ms” property. The default is 10000 ms (10 seconds). There is a design decision to be made around what you set this to… set it too long and if the consumer goes down, you may have to replay a huge amount of messages. Set it too short and you’ll overload zookeeper with constant commits.

ConsumerGroupCommand

The Consumer Group Command will replace the ConsumerOffsetChecker. This command is the same as the ConsumerOffsetChecker, but it allows you to use Kafka’s new consumer without using a ZooKeeper.

Using the example from above, you would use the following:

/usr/bin/kafka-run-class kafka.admin.ConsumerGroupCommand –describe –group cloudera_mirrormaker –zookeeper zk.servername01:2181

Example results:

GROUP                TOPIC            PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG      OWNER
cloudera_mirrormaker topic_name1      0         18713          428478         409765   none
cloudera_mirrormaker topic_name1      1         547            24997          24450    none
cloudera_mirrormaker topic_name1      2         107            409511         409404   none
cloudera_mirrormaker topic_name2      0         404108447      452060078      47951631 none
cloudera_mirrormaker topic_name2      1         426193345      465930519      39737174 none
cloudera_mirrormaker topic_name2      2         422415396      470367125      47951729 none

Kafka Manager

I use the open source Kafka Manager tool (https://github.com/yahoo/kafka-manager) to help identify throughput on my Kafka Brokers. The tool also allows for some basic management of the brokers (partition creation, reassignment, etc).

It is a scala application built using the SBT and Play frameworks.

Kafka REST Proxy

Developed by Confluent, the Kafka REST Proxy provides a RESTful interface to a Kafka cluster. It makes it easy to produce and consume messages, view the state of the cluster, and perform administrative actions without using the native Kafka protocol or clients. Examples of use cases include reporting data to Kafka from any front-end app built in any language, ingesting messages into a stream processing framework that doesn’t yet support Kafka, and scripting administrative actions.

The Kafka REST Proxy relies on the Schema Registry, which provides a serving layer for your metadata. The Schema Registry provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.

Kafka REST Proxy does not start on restart

Error: Kafka-rest does not start on restart as there is a mismatch in PIDs

Solution: Kill the kafka-rest process

sudo kill -9 $(ps aux | grep ‘[k]afka-rest’ | awk ‘{print $2}’)

Start the kafka-rest

sudo service kafka-rest start

Logging

All logs related to this service are available at /opt/confluent/logs/

  • kafka-rest-startup.log – captures stdout and stderr during startup
  • kafka-rest.log – log4j target for all other logging, rotates between 10 files up to 10MB each
  • schema-registry-startup.log – captures stdout and stderr during startup
  • schema-registry.log – log4j target for all other logging, rotates between 10 files up to 10MB each

Deciphering HTTP CODES

Reference: http://www.restpatterns.org/HTTP_Status_Codes

Troubleshooting

Unexpected error from SyncGroup: Messages are rejected since there are fewer in-sync replicas than required

Resolution: This means that the source Kafka Broker has less partitions than expected. Cloudera Manager will attempt to restart MirrorMaker, with the hope that by reconnecting MM will pull a new replica list from one of the Brokers. Only after the service is DOWN, should you use Cloudera Manager to manually restart MirrorMaker to pick up the change to the number of partitions. Because Cloudera Manager will do this automatically, user intervention is usually not required.

Next, contact the provider to make sure they are aware that their system is flapping. In the past, our client had a problem where older consumers were attempting to connect – causing instability in their Brokers.

Alternate explanation: This is occurring in a SyncGroup call so it’s probably due to the fact that the new consumer needs to write to the __consumer_offsets topic and can’t because __consumer_offsets isn’t meeting the min ISR requirements for the cluster.

Error:

In the Role Logs you see the following error:

Sep 7, 10:53:39.900 AM  FATAL   kafka.tools.MirrorMaker$MirrorMakerThread
[mirrormaker-thread-0] Mirror maker thread failure due to
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: Messages are rejected since there are fewer in-sync replicas than required.
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupRequestHandler.handle(AbstractCoordinator.java:436)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupRequestHandler.handle(AbstractCoordinator.java:403)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)

Further errors:

Sep 8, 5:13:07.214 PM   ERROR   org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Group cloudera_group1 failed to commit partition topic_name-0 at offset 827610278: Messages are rejected since there are fewer in-sync replicas than required.

Cloudera Manager will attempt to retry (by restarting the process). However, that will eventually fail.

And the fatal error that finally kills MM:

Sep 9, 12:44:52.247 AM  FATAL   kafka.tools.MirrorMaker$MirrorMakerThread   [mirrormaker-thread-0] Mirror maker thread exited abnormally, stopping the whole mirror maker.

Cloudera Manager will report the following error:

KAFKA_KAFKA_MIRROR_MAKER_SCM_HEALTH Role health test bad
Critical
The health test result for KAFKA_KAFKA_MIRROR_MAKER_SCM_HEALTH has become bad: This role’s process is starting. This role is supposed to be started.

Further explanation:

I’d suggest using the kafka-topics command to determine the current state of replicas for the _schemas topic (or whatever topic you are using). Here’s how I used it on the a local set of test services:

$ ./bin/kafka-topics.sh –zookeeper localhost:2181 –describe –topic _schemas
Topic:_schemas PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: _schemas Partition: 0 Leader: 0 Replicas: 0 Isr: 0

The root cause exception (NotEnoughReplicasException) indicates the in sync replicas for the topic partition has fallen below min.insync.replicas, so the output from this command should show you the difference between the full list of replicas and the in sync replicas (labeled Isr in the output). From there you can determine why one or more of the brokers are falling behind.

References

Kafka-Storm (and Spark) integration: http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/
Kafka-Storm-Spark GitHub: https://github.com/miguno/kafka-storm-starter

Good scaling question: https://grokbase.com/t/kafka/users/158n9a4sf3/painfully-slow-kafka-recovery