Kafka CRUD (Create, Read, Update, Delete)

One of the most common operations when using kafka is managing topics. This guide will show you how to do some of these tasks. These are just short examples to help you get the commands and concepts right. There is extensive documentations available if you want to dive deeper.

Requirements

To save yourself some time when working with Kafka, it can be handy to store the broker list and zookeeper lists in envrionment variables as indicated in the documentation above. We will refer to the strings by their variables in this document to make reading the document easier, and to allow you to copy/paste commands that work for your envrionment easily.

You will be using the kafka command line utilies for these actions. This is different than the AWS CLI tools (ie: aws kafka …). If you’re using the KafkaClientInstance, these tools will be in /home/ec2-user/kafka/bin.

Sometimes if you’re using examples off the internet, they example may or may not include the .sh extensions. As the kafka utilities have gone through revisions through the years the names of the tools have changed. If you’re ever confused when trying to work out a command, try adding or removing the .sh extension.

Common operations

This section assumes you’re familiar with the nomenclature of Kafka.

Reminder: you should be in /home/ec2-user/kafka/bin for all these commands.

List topics

To list the available topics on a cluster:

./kafka-topics.sh --zookeeper $MYZK --list

Example output from this command:

./kafka-topics.sh --zookeeper $MYZK --list
ExampleTopic
__consumer_offsets
_schemas
orders
orders-in
todd3

Describe topics

The descibe option will provide additional details about all the configured topics (if no specific topic is provided), showing you which broker is allocated as the lead and replicas for each partition that makes up a topic.

./kafka-topics.sh --zookeeper $MYZK --describe

You can also describe a specific topic, both provide the same output, one is just limitd to the topic:

./kafka-topics.sh --zookeeper $MYZK --describe --topic exampletopic

Example output from this command:

./kafka-topics.sh --zookeeper $MYZK --describe --topic ExampleTopic

Topic:ExampleTopic	PartitionCount:10	ReplicationFactor:3	Configs:
    Topic: ExampleTopic	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
    Topic: ExampleTopic	Partition: 1	Leader: 4	Replicas: 4,1,2	Isr: 1,2,4
    Topic: ExampleTopic	Partition: 2	Leader: 5	Replicas: 5,2,4	Isr: 5,2,4
    Topic: ExampleTopic	Partition: 3	Leader: 6	Replicas: 6,4,5	Isr: 6,5,4
    Topic: ExampleTopic	Partition: 4	Leader: 3	Replicas: 3,5,6	Isr: 3,5,6
    Topic: ExampleTopic	Partition: 5	Leader: 1	Replicas: 1,6,3	Isr: 1,6,3
    Topic: ExampleTopic	Partition: 6	Leader: 2	Replicas: 2,5,3	Isr: 2,5,3
    Topic: ExampleTopic	Partition: 7	Leader: 4	Replicas: 4,6,1	Isr: 6,1,4
    Topic: ExampleTopic	Partition: 8	Leader: 5	Replicas: 5,3,2	Isr: 5,3,2
    Topic: ExampleTopic	Partition: 9	Leader: 6	Replicas: 6,1,4	Isr: 6,1,4

A quick overview of what each bit of data above means

<img align= “left” src="media/msk_describe_topic.png”, alt="msk_topic_describe">

Notes:

  • Each partition has a leader - this is the broker that currently is managing reads and writes
  • The controller works to assign partitions as evenly as possible to all brokers, but does not take utilization into consideration:
    • This doesn’t have to be the case - when you create a topic you can pass in a partition mapping if you want
  • The “Replicas” field - these numbers indicate the brokerID of the other brokers that have a replica of the data. In a small cluster, its likely that all brokers will have a copy of every partition, but in larger clusters, you will see partitions spread across brokers
  • The ISR field - ISR is In Sync Replica. This tells you which of the brokers that have a replica are within a configured limit of offsets, and are considered to be ‘in-sync’.
    ○ Brokers that are listed as replicas, but NOT in the ISR list are brokers where their assigned partition is behind in replication, and are not (normally) available for leader election ○ These lists should always match - when they do not, something is likely wrong in the cluster (broker is offline, replication is lagging due to resource problems)
  • Configs section is blank - this is where you will see topic specific configuration (more on this later)

Create topics

To create a topic, you’ll need to decide on a name, as well as the number of partitions and replicas you want

./kafka-topics.sh --zookeeper $MYZK --create --topic ExampleTopic10 --partitions 10 --replication-factor 3

Example output from this command:

./kafka-topics.sh --zookeeper $MYZK --create --topic ExampleTopic10 --partitions 10 --replication-factor 3
Created topic ExampleTopic10.

You can add additional configuration at topic creation time - check --help for more details


Delete topics

In order to delete a topic, your cluster must have been created with the configuration delete.topic.enable=true. This is not the default, meaning that by default you cannot delete any topics on a cluster. This is a positive for data integrity, but can cause operational headaches if you want to clean up old topics, or to recreate a topic to wipe out all the data.

./kafka-topics.sh --zookeeper $MYZK --delete --topic ExampleTopic10

Example output:

./kafka-topics.sh --zookeeper $MYZK --delete --topic ExampleTopic10
Topic ExampleTopic10 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

It can take some time to the topic to actually be deleted. Not that is is simply marked for deletion above. A process will come along periodically to purge marked paritions. This means that if you run a delete then --list you may see the topic still there for many minutes. If it’s still there after an hour then likely topic delete isn’t enabled on your cluster.


Set topic retention - time based

By default, topics will be created with the default retention for the cluster (168 hours/7 days). If you want to change this (longer or shorter) you can override the retention value at the topic level.

./kafka-configs.sh --zookeeper $MYZK --alter --entity-name ExampleTopic --entity-type topics  --add-config retention.ms=60000

Example output:

./kafka-configs.sh --zookeeper $MYZK --alter --entity-name ExampleTopic --entity-type topics  --add-config retention.ms=60000
Completed Updating config for entity: topic 'ExampleTopic'.

You can then --describe the topic to view the custom configuration applied

./kafka-topics.sh --zookeeper $MYZK --describe --topic ExampleTopic
Topic:ExampleTopic	PartitionCount:10	ReplicationFactor:3	Configs:retention.ms=60000

Operational notes:

  • when you apply a lower retention time, it can take some time for the cluster to purge older data. It will send the request to each broker to purge the data older than a certain age, and the brokers will apply this, then background tasks will go through all the available data and identify the right data to purge. This can take quite some time on large topics or busy clusters.

  • this can be used to purge all the data from a topic - you can set the retention to 1ms and let the cluster purge the topic, then set the retention back to the desired timeframe. This can be useful if you don’t want to delete a topic (ie: topic deletion is disabled, or it has complex configurations you don’t want to lose, or you have producers writing to it and you don’t want to them throw an error while the topic is missing).


Set topic retention - size based

By default the cluster will use a time based retention policy. This works fairly well, but it’s very possible to have a time based retention policy that will happily fill up your disk before purging any data. This means you’ll have to tune your storage around your retention period, if you have a defined time you want to maintain data.

Or, you can set a sized based retention policy. This will purge the oldest data when a partition hits the defined size, regardless of the amount of time that the data has been in the topic. There is currently no way to AND/OR these retention policies - it is one or the other. Since this limit is enforced at the partition level, multiply it by the number of partitions to determine how much storage the entire topic will use.

This will set the retention to 512mb per partition:

./kafka-configs.sh --zookeeper $MYZK --alter --entity-name ExampleTopic --entity-type topics --add-config retention.bytes=512000000

Example:

./kafka-configs.sh --zookeeper $MYZK --alter --entity-name ExampleTopic --entity-type topics --add-config retention.bytes=512000000
Completed Updating config for entity: topic 'ExampleTopic'.

Remove a topic configuration

If you’ve set a custom configuration and you want to remove it, you can!

./kafka-configs.sh --zookeeper $MYZK --alter --entity-name ExampleTopic --entity-type topics --delete-config retention.bytes

Example:

./kafka-configs.sh --zookeeper $MYZK --alter --entity-name ExampleTopic --entity-type topics --delete-config retention.bytes
Completed Updating config for entity: topic 'ExampleTopic'.

Resources