Migration methodologies and steps

Migration methodologies and steps

The migration methodology illustrated here assumes a need to replicate data from the source to the destination Apache Kafka cluster before the migration of producers and consumers. The use case requires that messages up to the defined retention period be available to existing consumers to re-read or slow and new consumers to read. For example, it could be an event driven architecture where Apache Kafka is used as a message bus in a micro-services architecture or where applications store key value data in Kafka and read them to populate memory caches. In situations and use cases where past data is either not important or relevant and the interest is only in new and most recent data, for example, log analytics, there might not be a need to replicate the data from source to destination clusters. In those cases, just migrating the producers and consumers to the destination cluster could suffice.

As mentioned in the Overview section, MirrorMaker v2 (MM2), detects and replicates topics, topic partitions, topic configurations and topic ACLs to the destination cluster that matches a regex topic pattern. Further, it checks for new topics that matches the topic pattern or changes to configurations and ACLs at regular configurable intervals. The topic pattern can also be dynamically changed by changing the configuration of the MirrorSourceConnector. Therefore MM2 can be used to migrate topics and topic data to the destination cluster and keep them in sync.

In order to differentiate topics between the source and destination, MM2 utilizes a ReplicationPolicy. The DefaultReplicationPolicy implementation uses a <source-cluster-alias>.<topic> naming convention as described in KIP-382.The consumer, when it starts up will subscribe to the replicated topic based on the topic pattern specified which should account for both the source topic and the replicated topic names.

MM2 also keeps source and destination offsets in sync. The MM2 MirrorCheckpointConnector periodically emits checkpoints in the destination cluster, containing offsets for each consumer group in the source cluster. The connector periodically queries the source cluster for all committed offsets from all consumer groups, filters for topics being replicated, and emits a message to a topic like <source-cluster-alias>.checkpoints.internal in the destination cluster. These offsets can then be queried and retrieved by using provided classes RemoteClusterUtils or MirrorClient.

MM2 runs on the Kafka Connect platform which does not support exactly once semantics, so there is always a possibility of duplicate messages at the destination. Consequently, the consumers have to be idempotent or able to detect and discard duplicates. However, producer idempotence can be enabled in MM2 that would protect against duplicates due to producer retries.

The migration steps are designed to minimize duplicates and ordering issues. We’re looking at a simple use case of one topic in the source Amazon MSK cluster with one producer and one consumer being migrated to a destination Amazon MSK cluster. The producer is a Clickstream producer which uses message keys (userid) and the Clickstream consumer needs to read messages in order per key.

There are multiple ways to do the migration as described below.

Using a custom Replication Policy and a background process to sync MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination

Note: This requires NO change to the consumer code. This does require a separate application that syncs offsets, runs in the background and has a dependency on connect-mirror-client and kafka-clients available with Apache Kafka 2.5.0 and above.

Steps:

  1. Start MM2.
    This will detect the topic and create it in the destination and start replicating messages. The MirrorSourceConnector configuration will include a custom ReplicationPolicy class which will enable the replicated topic at the destination to be created with the same name as the source instead of the default <source-cluster-alias>.<topic> format. By default, it will use an auto.offset.reset = earliest setting and start reading messages from the beginning of the retention period for the topic in the source.

  2. Start the MM2GroupOffsetSync application.
    This application periodically syncs the MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.

  3. Stop the consumer.
    The consumer should be designed to do a commitsync before shutting down cleanly, in which case the last consumed offset will be committed to the source __consumer_offsets topic and then checkpointed by MM2 to the destination in the msksource.checkpoints.internal topic. If the consumer does not shutdown cleanly or the consumer is re-started pointing to the destination within the configured checkpoint interval, there could be some duplicate records when it resumes. In order to deal with that, the consumer should be idempotent. In our case, the consumer does do a commitsync when it shuts down.

  4. Wait for the MM2GroupOffsetSync application to sync the last committed offset, then stop the application.
    The application checks to make sure the consumer group in the destination is empty before syncing offsets so as not to overwrite the consumer after it has failed over. However, shutting it down would be safer.

  5. Start the consumer against the destination Apache Kafka Amazon MSK cluster by changing the bootstrap brokers configuration.

    • The consumer, when it starts up will subscribe to the replicated topic based on the topic pattern specified and pick up the replicated topic which would be the same as the source topic name.

    • It will find the offsets for it’s consumer group in the destination and it will start consuming from the last synced offset.

    • At this point, the producer is still producing messages to the source which are getting replicated to the destination and the consumer is reading the replicated messages.

  6. Stop the producer.
    This could be at any convenient time after the consumer has moved over.

  7. Start the producer against the destination Apache Kafka Amazon MSK cluster by changing the bootstrap brokers configuration.
    If the producer is stopped and re-started against the destination before all the messages in the replication pipeline are consumed by the consumer, there could be issues with ordering. If ordering is important, wait for all the messages to replicate before starting the producer.

  8. Stop MM2.

Using Default Replication Policy and a background process to sync MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination

Note: This requires NO change to the consumer code except the topic pattern it subscribes to. This does require a separate application that syncs offsets, runs in the background and has a dependency on connect-mirror-client and kafka-clients available with Apache Kafka 2.5.0 and above.

Steps:

  1. Start MM2.
    This will detect the topic and create it in the destination and start replicating messages. As mentioned above, the replicated topic at the destination will be created in the format <source-cluster-alias>.<topic> as it uses the DefaultReplicationPolicy. By default, it will use an auto.offset.reset = earliest setting and start reading messages from the beginning of the retention period for the topic in the source.

  2. Start the MM2GroupOffsetSync application.
    This application periodically syncs the MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.

  3. Stop the consumer.
    The consumer should be designed to do a commitsync before shutting down cleanly, in which case the last consumed offset will be committed to the source __consumer_offsets topic and then checkpointed by MM2 to the destination in the msksource.checkpoints.internal topic. If the consumer does not shutdown cleanly or the consumer is re-started pointing to the destination within the configured checkpoint interval, there could be some duplicate records when it resumes. In order to deal with that, the consumer should be idempotent. In our case, the consumer does do a commitsync when it shuts down.

  4. Wait for the MM2GroupOffsetSync application to sync the last committed offset, then stop the application.
    The application checks to make sure the consumer group in the destination is empty before syncing offsets so as not to overwrite the consumer after it has failed over. However, shutting it down would be safer.

  5. Start the consumer against the destination Apache Kafka Amazon MSK cluster by changing the bootstrap brokers configuration.

    • In this case, MM2 utilizes the DefaultReplicationPolicy implementation. As mentioned above, this creates topics in the destination cluster in the <source-cluster-alias>.<topic> format. The consumer, when it starts up will subscribe to the replicated topic based on the topic pattern specified which should account for both the source topic and the replicated topic names.

    • It will find the offsets for it’s consumer group in the destination and it will start consuming from the last synced offset.

    • At this point, the producer is still producing messages to the source which are getting replicated to the destination and the consumer is reading the replicated messages.

  6. Stop the producer.
    This could be at any convenient time after the consumer has moved over.

  7. (Optional) Create the source topic at the destination.
    The consumer can either continue to read messages from the replicated topic (<source-cluster-alias>.<topic>) and the producer is modified to point to the replicated topic which will require a producer code change or the source topic can be created in the destination which will require no producer code change.

  8. Start the producer against the destination Apache Kafka Amazon MSK cluster by changing the bootstrap brokers configuration.
    If the producer is stopped and re-started against the destination before all the messages in the replication pipeline are consumed by the consumer, there could be issues with ordering. If ordering is important, wait for all the messages to replicate before starting the producer.

  9. Stop MM2.

Using Default Replication Policy and the consumer to query and snap to MM2 checkpointed offsets

Note: This requires a change to the consumer code and has a dependency on connect-mirror-client available with Apache Kafka 2.4.0 and above.

Steps:

  1. Start MM2.
    This will detect the topic and create it in the destination and start replicating messages. By default, it will use an auto.offset.reset = earliest setting and start reading messages from the beginning of the retention period for the topic in the source.

  2. Stop the consumer.
    The consumer should be designed to do a commitsync before shutting down cleanly, in which case the last consumed offset will be committed to the source __consumer_offsets topic and then checkpointed by MM2 to the destination in the msksource.checkpoints.internal topic. If the consumer does not shutdown cleanly, there could be some duplicate records when it resumes. However, the consumer should be idempotent anyway. In our case, the consumer does do a commitsync when it shuts down and populates a file (/tmp/consumer_bookmark.txt) with the last consumed offset for each partition in the format (TopicPartitionOffset:ExampleTopic-0,<offset>).

  3. Start the consumer against the destination Apache Kafka Amazon MSK cluster by changing the bootstrap brokers configuration.

    • In order to differentiate topics between the source and destination, MM2 utilizes a ReplicationPolicy. The DefaultReplicationPolicy implementation uses a <source-cluster-alias>.<topic> naming convention as described in KIP-382.The consumer, when it starts up will subscribe to the replicated topic based on the topic pattern specified which should account for both the source topic and the replicated topic names.

    • It will not find offsets for its consumer group and will try to use the setting for auto.offset.reset (even if you replicate __consumer_offsets and the consumer finds the last committed offset for it’s consumer group, the actual offsets for the messages in the Apache Kafka log could be different due to producer retries or more likely due to the fact that the retention period in the source topic could’ve passed and messages in the source topic already deleted when MM2 starts). As mentioned above, MM2 provides a facility where it checkpoints the last consumed offsets for each consumer group and the corresponding destination offsets in the <source-cluster-alias>.checkpoints.internal topic at the destination. In our case, the consumer accepts a parameter –failover (or -flo) which tells the consumer that a failover has happened and it needs to find out the translated remote offsets and seek to that before resuming consumption. It does so by using the new RemoteClusterUtils or MirrorClient classes.

    • Since, the checkpoints happen periodically (the interval is configurable), the checkpointed source offsets may lag behind the last consumed offsets by the consumer at the source. Consequently, after failover, if the consumer seeks to the translated offset, it could read some duplicate messages. In order to minimize that, in our case, the consumer gets the last committed offset by the consumer at the source before shutdown, calculates the checkpoint lag in number of messages and skips ahead that many messages at the destination.

    • At this point, the producer is still producing messages to the source which are getting replicated to the destination and the consumer is reading the replicated messages.

  4. Stop the producer.
    This could be at any convenient time after the consumer has moved over.

  5. (Optional) Create the source topic at the destination.
    The consumer can either continue to read messages from the replicated topic (<source-cluster-alias>.<topic>) and the producer is modified to point to the replicated topic which will require a producer code change or the source topic can be created in the destination which will require no producer code change.

  6. Start the producer against the destination Apache Kafka Amazon MSK cluster by changing the bootstrap brokers configuration.
    If the producer is stopped and re-started against the destination before all the messages in the replication pipeline are consumed by the consumer, there could be issues with ordering. If ordering is important, wait for all the messages to replicate before starting the producer.

  7. Stop MM2.

Note: These steps could also be potentially used to failover between two Amazon MSK clusters or 2 Apache Kafka clusters both in the same region as well as across regions.