kafka console tools
Introduction#
Kafka offers command-line tools to manage topics, consumer groups, to consume and publish messages and so forth.
Important: Kafka console scripts are different for Unix-based and Windows platforms. In the examples, you might need to add the extension according to your platform.
Linux: scripts located in bin/ with .sh extension.
Windows: scripts located in bin\windows\ and with .bat extension.
kafka-topics
This tool let you list, create, alter and describe topics.
List topics:
kafka-topics --zookeeper localhost:2181 --listCreate a topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testcreates a topic with one partition and no replication.
Describe a topic:
kafka-topics --zookeeper localhost:2181 --describe --topic testAlter a topic:
# change configuration
kafka-topics --zookeeper localhost:2181 --alter --topic test --config max.message.bytes=128000
# add a partition
kafka-topics --zookeeper localhost:2181 --alter --topic test --partitions 2(Beware: Kafka does not support reducing the number of partitions of a topic) (see this list of configuration properties)
kafka-console-producer
This tool lets you produce messages from the command-line.
Send simple string messages to a topic:
kafka-console-producer --broker-list localhost:9092 --topic test
here is a message
here is another message
^D(each new line is a new message, type ctrl+D or ctrl+C to stop)
Send messages with keys:
kafka-console-producer --broker-list localhost:9092 --topic test-topic \
--property parse.key=true \
--property key.separator=,
key 1, message 1
key 2, message 2
null, message 3
^DSend messages from a file:
kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.logkafka-console-consumer
This tool let’s you consume messages from a topic.
to use the old consumer implementation, replace
--bootstrap-serverwith--zookeeper.
Display simple messages:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test Consume old messages:
In order to see older messages, you can use the --from-beginning option.
Display key-value messages:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic \
--property print.key=true \
--property key.separator=, kafka-simple-consumer-shell
This consumer is a low-level tool which allows you to consume messages from specific partitions, offsets and replicas.
Useful parameters:
parition: the specific partition to consume from (default to all)offset: the beginning offset. Use-2to consume messages from the beginning,-1to consume from the end.max-messages: number of messages to printreplica: the replica, default to the broker-leader (-1)
Exemple:
kafka-simple-consumer-shell \
--broker-list localhost:9092 \
--partition 1 \
--offset 4 \
--max-messages 3 \
--topic test-topicdisplays 3 messages from partition 1 beginning at offset 4 from topic test-topic.
kafka-consumer-groups
This tool allows you to list, describe, or delete consumer groups. Have a look at this article for more information about consumer groups.
if you still use the old consumer implementation, replace
--bootstrap-serverwith--zookeeper.
List consumer groups:
kafka-consumer-groups --bootstrap-server localhost:9092 --list
octopusDescribe a consumer-group:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group octopus
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
octopus test-topic 0 15 15 0 octopus-1/127.0.0.1
octopus test-topic 1 14 15 1 octopus-2_/127.0.0.1Remarks: in the output above,
current-offsetis the last committed offset of the consumer instance,log-end-offsetis the highest offset of the partition (hence, summing this column gives you the total number of messages for the topic)lagis the difference between the current consumer offset and the highest offset, hence how far behind the consumer is,owneris theclient.idof the consumer (if not specified, a default one is displayed).
Delete a consumer-group:
deletion is only available when the group metadata is stored in zookeeper (old consumer api). With the new consumer API, the broker handles everything including metadata deletion: the group is deleted automatically when the last committed offset for the group expires.
kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group octopus