kafka console tools
Introduction#
Kafka offers command-line tools to manage topics, consumer groups, to consume and publish messages and so forth.
Important: Kafka console scripts are different for Unix-based and Windows platforms. In the examples, you might need to add the extension according to your platform.
Linux: scripts located in bin/
with .sh
extension.
Windows: scripts located in bin\windows\
and with .bat
extension.
kafka-topics
This tool let you list, create, alter and describe topics.
List topics:
kafka-topics --zookeeper localhost:2181 --list
Create a topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
creates a topic with one partition and no replication.
Describe a topic:
kafka-topics --zookeeper localhost:2181 --describe --topic test
Alter a topic:
# change configuration
kafka-topics --zookeeper localhost:2181 --alter --topic test --config max.message.bytes=128000
# add a partition
kafka-topics --zookeeper localhost:2181 --alter --topic test --partitions 2
(Beware: Kafka does not support reducing the number of partitions of a topic) (see this list of configuration properties)
kafka-console-producer
This tool lets you produce messages from the command-line.
Send simple string messages to a topic:
kafka-console-producer --broker-list localhost:9092 --topic test
here is a message
here is another message
^D
(each new line is a new message, type ctrl+D or ctrl+C to stop)
Send messages with keys:
kafka-console-producer --broker-list localhost:9092 --topic test-topic \
--property parse.key=true \
--property key.separator=,
key 1, message 1
key 2, message 2
null, message 3
^D
Send messages from a file:
kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log
kafka-console-consumer
This tool let’s you consume messages from a topic.
to use the old consumer implementation, replace
--bootstrap-server
with--zookeeper
.
Display simple messages:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test
Consume old messages:
In order to see older messages, you can use the --from-beginning
option.
Display key-value messages:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic \
--property print.key=true \
--property key.separator=,
kafka-simple-consumer-shell
This consumer is a low-level tool which allows you to consume messages from specific partitions, offsets and replicas.
Useful parameters:
parition
: the specific partition to consume from (default to all)offset
: the beginning offset. Use-2
to consume messages from the beginning,-1
to consume from the end.max-messages
: number of messages to printreplica
: the replica, default to the broker-leader (-1)
Exemple:
kafka-simple-consumer-shell \
--broker-list localhost:9092 \
--partition 1 \
--offset 4 \
--max-messages 3 \
--topic test-topic
displays 3 messages from partition 1 beginning at offset 4 from topic test-topic.
kafka-consumer-groups
This tool allows you to list, describe, or delete consumer groups. Have a look at this article for more information about consumer groups.
if you still use the old consumer implementation, replace
--bootstrap-server
with--zookeeper
.
List consumer groups:
kafka-consumer-groups --bootstrap-server localhost:9092 --list
octopus
Describe a consumer-group:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group octopus
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
octopus test-topic 0 15 15 0 octopus-1/127.0.0.1
octopus test-topic 1 14 15 1 octopus-2_/127.0.0.1
Remarks: in the output above,
current-offset
is the last committed offset of the consumer instance,log-end-offset
is the highest offset of the partition (hence, summing this column gives you the total number of messages for the topic)lag
is the difference between the current consumer offset and the highest offset, hence how far behind the consumer is,owner
is theclient.id
of the consumer (if not specified, a default one is displayed).
Delete a consumer-group:
deletion is only available when the group metadata is stored in zookeeper (old consumer api). With the new consumer API, the broker handles everything including metadata deletion: the group is deleted automatically when the last committed offset for the group expires.
kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group octopus