Client Migration and Cutover

Steps we will perform in this section


Consumer Migration

Using a custom Replication Policy and a background process to sync MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination as described in Migration Steps

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).


  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Terminate the consumer which was consuming from the source Amazon MSK cluster by killing the consumer process using the pid that you had recorded earlier when starting the consumer. Run the following command.

    kill <pid>
    
  • Tail the consumer log file to check if it exited properly and note down the last processed Global Seq no and the last offset. Ctrl-C to exit from the tail command.

    tail -f /tmp/consumer.log
    
  • Check the log file for the MM2GroupOffsetSync application which periodically syncs the MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.

    tail -f /tmp/MM2GroupOffsetSync.log
    
    • Make sure that it does not find any more offsets to sync. Check for the following messages.
      Note: The application checks to make sure the consumer group in the destination is empty before syncing offsets so as not to overwrite the consumer after it has failed over. However, shutting it down would be safer.


  • Stop the MM2GroupOffsetSync application which periodically syncs the MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.

    kill <pid>
    
  • Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
    Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest
    Note: Make sure to keep the GROUP_ID_CONFIG the same. The consumer will then be able to get the last synced offset from __consumer_offsets in the destination Amazon MSK cluster.

    cp /tmp/consumer_bookmark.txt /tmp/consumer_bookmark.txt_src
    cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_dest
    vim /tmp/kafka/consumer.properties_dest
    
  • Run the Clickstream consumer against the destination Amazon MSK cluster.
    Note: To kill the consumer, note down the pid (process id) of the consumer process. Use kill <pid> to kill the process.

    cp /tmp/consumer.log /tmp/consumer.log_src
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties_dest -nt 3 -rf 10800 -mtls -src msksource > /tmp/consumer.log 2>&1 &
    
  • Check the consumer log to make sure it is consuming messages.

    tail -f /tmp/consumer.log
    
  • Check to make sure that the consumer started consuming from the last offset synced by the MM2GroupOffsetSync application.

    grep 'Updated offsets for consumer group' /tmp/MM2GroupOffsetSync.log|tail -1
    grep 'Consumer position:' /tmp/consumer.log
    

At this point the consumer is migrated to the destination Amazon MSK cluster.


Using Default Replication Policy and a background process to sync MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination as described in Migration Steps

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).


  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Terminate the consumer which was consuming from the source Amazon MSK cluster by killing the consumer process using the pid that you had recorded earlier when starting the consumer. Run the following command.

    kill <pid>
    
  • Tail the consumer log file to check if it exited properly and note down the last processed Global Seq no and the last offset. Ctrl-C to exit from the tail command.

    tail -f /tmp/consumer.log
    
  • Check the log file for the MM2GroupOffsetSync application which periodically syncs the MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.

    tail -f /tmp/MM2GroupOffsetSync.log
    
    • Make sure that it does not find any more offsets to sync. Check for the following messages.
      Note: The application checks to make sure the consumer group in the destination is empty before syncing offsets so as not to overwrite the consumer after it has failed over. However, shutting it down would be safer.


  • Stop the MM2GroupOffsetSync application which periodically syncs the MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.

    kill <pid>
    
  • Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
    Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest
    Note: Make sure to keep the GROUP_ID_CONFIG the same. The consumer will then be able to get the last synced offset from __consumer_offsets in the destination Amazon MSK cluster.

    cp /tmp/consumer_bookmark.txt /tmp/consumer_bookmark.txt_src
    cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_dest
    vim /tmp/kafka/consumer.properties_dest
    
  • Run the Clickstream consumer against the destination Amazon MSK cluster.
    Note: To kill the consumer, note down the pid (process id) of the consumer process. Use kill <pid> to kill the process.

    cp /tmp/consumer.log /tmp/consumer.log_src
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties_dest -nt 3 -rf 10800 -mtls -src msksource > /tmp/consumer.log 2>&1 &
    
  • Check the consumer log to make sure it is consuming messages.

    tail -f /tmp/consumer.log
    
  • Check to make sure that the consumer started consuming from the last offset synced by the MM2GroupOffsetSync application.

    grep 'Updated offsets for consumer group' /tmp/MM2GroupOffsetSync.log|tail -1
    grep 'Consumer position:' /tmp/consumer.log
    
  • Create the source topic in the destination.

    In order to differentiate topics between the source and destination, MM2 utilizes a ReplicationPolicy. The DefaultReplicationPolicy (which you can override with your own implementation class) implementation uses a <source-cluster-alias>.<topic> naming convention as described in KIP-382. When the consumer was started against the destination Amazon MSK cluster, it started consuming from the msksource.ExampleTopic topic based on the regex topic pattern (^msksource.ExampleTopic[\w]|^ExampleTopic[\w]) specified in the consumer. When we cutover the producer to the destination cluster without making any changes to the topic list it was producing to in the source Amazon MSK cluster, it would expect the same ExampleTopic to exist. So, we will create the same topic in the destination. However, based on the topic pattern mentioned above, the consumer will automatically pick up that topic when we create it and start consuming from it after going through a re-balance.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoomskdest --replication-factor 3 --partitions 3 --topic ExampleTopic
    

At this point the consumer is migrated to the destination Amazon MSK cluster.


Using Default Replication Policy and the consumer to query and snap to MM2 checkpointed offsets as described in Migration Steps

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).


  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Terminate the consumer which was consuming from the source Amazon MSK cluster by killing the consumer process using the pid that you had recorded earlier when starting the consumer. Run the following command.

    kill <pid>
    
  • Tail the consumer log file to check if it exited properly and note down the last processed Global Seq no and the last offset. Ctrl-C to exit from the tail command.

    tail -f /tmp/consumer.log
    
  • Look at the /tmp/consumer_bookmark.txt file. When exiting, the consumer records the last offset it has read for each parition in the subscribed topics (in this case ExampleTopic) as well as the last Global Seq No.

    cat /tmp/consumer_bookmark.txt
    
    • The output should look like this. The values for TopicPartitionOffset for each partiton will be used to check how many potential messages could be re-read after the consumer is cutover to the destination Amazon MSK cluster.


  • Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
    Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest
    Note: Make sure to keep the GROUP_ID_CONFIG the same. It will be used to retrieve the corresponding offset in the destination Amazon MSK cluster.

    cp /tmp/consumer_bookmark.txt /tmp/consumer_bookmark.txt_src
    cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_dest
    vim /tmp/kafka/consumer.properties_dest
    
  • Run the Clickstream consumer against the destination Amazon MSK cluster.
    Note: To kill the consumer, note down the pid (process id) of the consumer process. Use kill <pid> to kill the process.

    cp /tmp/consumer.log /tmp/consumer.log_src
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties_dest -nt 3 -rf 10800 -mtls -flo -src msksource -dst mskdest > /tmp/consumer.log 2>&1 &
    

    Note: (DO NOT RUN THIS UNLESS YOU’RE IN THIS SITUATION) If you kill the consumer after pointing to the destination cluster and want the consumer to resume from where it left off in the destination cluster, DO NOT use the -flo parameter. Use the following commands.

    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties_dest -nt 3 -rf 10800 -mtls -src msksource > /tmp/consumer.log 2>&1 &
    

    Note: (DO NOT RUN THIS UNLESS YOU’RE IN THIS SITUATION) If you kill the consumer after pointing to the destination cluster and want to run the consumer as if you’re running the consumer for the first time in failover mode. Use the following commands.

    cp /tmp/consumer_bookmark.txt_src /tmp/consumer_bookmark.txt
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties_dest -nt 3 -rf 10800 -mtls -flo -src msksource -dst mskdest > /tmp/consumer.log 2>&1 &
    
  • Run the following command.

    grep "Checkpoint Lag" /tmp/consumer.log
    
    • The output should look like this. The Checkpoint Lag indicates the number of messages the checkpoint is behind the last consumed and committed offset by the consumer before the cutover. If it is zero, that means the checkpoint interval has passed and the last commited offset has been checkpointed for the consumer group in the destination. If it is not zero, the consumer skips ahead the number of messages equal to the lag to avoid duplicates and starts consuming.


    • Look at the /tmp/consumer.log file and look for the Final Translated Offsets entries. For each of the consumers (we have 3), these represent the offsets for the 3 partitions of the topic in the destination MSK cluster that the consumers will resume reading from. These are offsets that were translated by mapping the source and destination offsets by MM2. In this case, the offsets could be the same as we started MirrorMaker 2 soon after we started producing into the source topic. However, if the retention period of the source topic had passed before we started MM2 or we had started MM2 with consumer.auto.offset.reset set to latest instead of earliest (with the producer running before MM2) in the mm2-msc MirrorSourceConnector, it could’ve been different.

      more /tmp/consumer.log
      

      Note: Hit q to quit

  • Create the source topic in the destination.

    In order to differentiate topics between the source and destination, MM2 utilizes a ReplicationPolicy. The DefaultReplicationPolicy (which you can override with your own implementation class) implementation uses a <source-cluster-alias>.<topic> naming convention as described in KIP-382. When the consumer was started against the destination Amazon MSK cluster, it started consuming from the msksource.ExampleTopic topic based on the regex topic pattern (^msksource.ExampleTopic[\w]|^ExampleTopic[\w]) specified in the consumer. When we cutover the producer to the destination cluster without making any changes to the topic list it was producing to in the source Amazon MSK cluster, it would expect the same ExampleTopic to exist. So, we will create the same topic in the destination. However, based on the topic pattern mentioned above, the consumer will automatically pick up that topic when we create it and start consuming from it after going through a re-balance.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoomskdest --replication-factor 3 --partitions 3 --topic ExampleTopic
    

At this point the consumer is migrated to the destination Amazon MSK cluster.


Producer Migration

  • Terminate the producer which was sending events to the source Amazon MSK cluster by killing the producer process using the pid that you had recorded earlier when starting the producer. Run the following command.

    kill <pid>
    
  • Tail the producer log file to check if it exited properly. When exiting, the producer records the last offset it has read for each parition in the subscribed topics (in this case ExampleTopic) as well as the last Global Seq No. Ctrl-C to exit.

    tail -f /tmp/producer.log
    
  • Make a copy of the /tmp/kafka/producer.properties_msk file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
    Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest

    cd /tmp/kafka
    cp /tmp/kafka/producer.properties_msk /tmp/kafka/producer.properties_msk_dest
    vim /tmp/kafka/producer.properties_msk_dest
    
  • Run the Clickstream producer against the destination Amazon MSK cluster. The producer will run in the background.
    Note: To kill the producer, note down the pid (process id) of the consumer process. Use kill <pid> to kill the process.

    mv /tmp/producer.log /tmp/producer.log_src
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3800:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk_dest -nt 8 -rf 10800 -mtls -nle > /tmp/producer.log 2>&1 &
    
  • The producer will start sending messages to the new ExampleTopic in the destination MSK cluster. The consumer, which is already consuming from the msksource.ExampleTopic will recognize the new topic when it is created, go through a rebalance and start reading from the topic. One important thing to note is that, the consumer will not wait to read from the new topic before it reads all the events in the replication flow. That could cause the events to be read by the consumer out of order. If order is important, it might be best to check the Kafka Connect consumer lag for the MirrorSourceConnector and to wait starting the producer on the destination until the consumer lag is zero. Please go back to the Grafana monitoring dashboard to take a look at the consumer lag as explained in the Monitoring section.

  • Tail the producer log file to check if it exited properly. Ctrl-C to exit.

    tail -f /tmp/producer.log
    
  • Run the following command to check if the consumer picked up the new partitions for the newly created ExampleTopic in the destination Amazon MSK cluster.

    grep "Consumer assignment" /tmp/consumer.log
    
  • Check the consumer.log file to see if the consumer is reading messages from the newly created ExampleTopic in the destination Amazon MSK cluster.

    tail -f /tmp/consumer.log
    

At this point the producer is migrated to the destination Amazon MSK cluster.


Stop MM2 and Kafka Connect

  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Run the following commands.

    sudo systemctl stop kafka-connect
    sudo systemctl status kafka-connect
    
  • Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Run the following commands.

    sudo systemctl stop kafka-connect
    sudo systemctl status kafka-connect