MirrorMaker 2 on Kafka Connect

Steps we will perform in this section

Setup and start MirrorMaker 2 on Kafka Connect

Setup Kafka Connect cluster

Kafka Client Instance2
  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if alreay open).


  • Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka directory and edit the connect-distributed.properties file. Edit the following properties.
    Note: To get the bootstrap servers, run the command echo $brokerstlsmskdest. We’re pointing to the destination Amazon MSK cluster as that is the cluster we will be migrating to.

    bootstrap.servers=<BootstrapBroker-String(TLS)>
    
    • Under Authentication settings for Connect workers
      Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.

      ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      
    • Under Authentication settings for Connect producers used with source connectors
      Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.

      producer.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      producer.ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      
  • Start the Kafka Connect service. It will use the ssl parameters from the /tmp/connect-distributed.properties file and connect to the Amazon MSK cluster using TLS mutual authentication.

    sudo systemctl start kafka-connect.service
    sudo systemctl status kafka-connect.service
    
    • This is the expected output from running these commands.


  • Make sure the Kafka Connect service started properly.

    cat /tmp/kafka/kafka-connect.log|grep Herder
    
    • You should see an output similar to this.
      Note: It might take a few seconds for it to start up. If you get an output which does not look like the picture below, give it a few seconds and retry the command.
      Note: A Distributed herder coordinates with other workers to spread work across multiple processes. In the output you can see that the Herder started successfully and could connect to the Amazon MSK cluster using TLS mutual authentication and read the internal Kafka connect topics.


  • Kafka Connect has a REST Interface that supports a REST API for managing connectors. The default port for the service is 8083 and we did not change the port in the connect-distributed.properties file. On each of the Kafka Client Instances, the service is available at http://localhost:8083. Run the following command.

    curl -X GET http://localhost:8083| jq .
    
    • You should get an output like the following

Kafka Client Instance1
  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka directory and edit the connect-distributed.properties file. Edit the following properties.
    Note: To get the bootstrap servers, run the command echo $brokerstlsmskdest. We’re pointing to the destination Amazon MSK cluster as that is the cluster we will be migrating to.

    bootstrap.servers=<BootstrapBroker-String(TLS)>
    
    • Under Authentication settings for Connect workers
      Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.

      ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      
    • Under Authentication settings for Connect producers used with source connectors
      Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.

      producer.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      producer.ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
      
  • Start the Kafka Connect service. It will use the ssl parameters from the /tmp/connect-distributed.properties file and connect to the Amazon MSK cluster using TLS mutual authentication.

    sudo systemctl start kafka-connect.service
    sudo systemctl status kafka-connect.service
    
    • This is the expected output from running these commands.


  • Make sure the Kafka Connect service started properly.

    cat /tmp/kafka/kafka-connect.log|grep Herder
    
    • You should see an output similar to this.
      Note: It might take a few seconds for it to start up. If you get an output which does not look like the picture below, give it a few seconds and retry the command.
      Note: A Distributed herder coordinates with other workers to spread work across multiple processes. In the output you can see that the Herder started successfully and could connect to the Amazon MSK cluster using TLS mutual authentication and read the internal Kafka connect topics.


  • Kafka Connect has a REST Interface that supports a REST API for managing connectors. The default port for the service is 8083 and we did not change the port in the connect-distributed.properties file. On each of the Kafka Client Instances, the service is available at http://localhost:8083. Run the following command.

    curl -X GET http://localhost:8083| jq .
    
    • You should get an output like the following

  • Observe the topics curently in the source and destination Amazon MSK clusters. Run the following commands.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --list --zookeeper $zoomsksource
    

    Note: The source cluster has the ExampleTopic topic the producer is sending records to and __consumer_offsets.

    bin/kafka-topics.sh --list --zookeeper $zoomskdest
    

    Note: The destination cluster has 3 Kafka connect internal topics as well as the Schema Registry internal topic _schemas since we pointed both to the destination Amazon MSK cluster, in addition to __consumer_offsets.

You now have a Kafka Connect cluster running.


Configure and start MirrorMaker 2 connectors

Using a custom ReplicationPolicy
Using the DefaultReplicationPolicy

Using a custom ReplicationPolicy
  • To read more about the connectors and what they do, see the Connectors section in KIP-382.

  • Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka dir and edit the connectors’ properties files to update the bootstrap servers. To download the properties files, click on mm2-msc-cust-repl-policy, mm2-cpc-cust-repl-policy, mm2-hbc.

    • Edit the mm2-msc-cust-repl-policy.json file for the MirrorSourceConnector.
      Note: the custom ReplicationPolicy jar file (CustomMM2ReplicationPolicy-1.0-SNAPSHOT.jar) was copied to the /home/ec2-user/confluent/share/java/kafka dir by the CloudFormation template.
      Note: To get the source code of the custom ReplicationPolicy see github.

      cd /tmp/kafka
      vim mm2-msc-cust-repl-policy.json
      
      • Update the following properties.

        • target.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmskdest
        • source.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmsksource
        • source.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.truststore.password: if you changed it
        • source.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • source.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.keystore.password: if you changed it
        • target.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)

        Note: The configuration "replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy" specifies the custom Replication policy.
        Note: If you want to specify any other consumer or producer configurations, prefix them with consumer. and producer. respectively. For example, consumer.auto.offset.reset.
        Note: The topics, topic configs, topic acls and consumer groups refresh intervals are set to 20 seconds. If you wish to change those, you can update those settings.

    • Edit the mm2-cpc-cust-repl-policy.json file for the MirrorCheckpointConnector.
      Note: the custom ReplicationPolicy jar file (CustomMM2ReplicationPolicy-1.0-SNAPSHOT.jar) was copied to the /home/ec2-user/confluent/share/java/kafka dir by the CloudFormation template.

      cd /tmp/kafka
      vim mm2-cpc-cust-repl-policy.json
      
      • Update the following properties.

        • target.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmskdest
        • source.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmsksource
        • source.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.truststore.password: if you changed it
        • source.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • source.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.keystore.password: if you changed it
        • target.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)

        Note: The configuration "replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy" specifies the custom Replication policy.
        Note: The checkpoint interval is set to 20 seconds. If you wish to change that, you can update that setting.

    • Edit the mm2-hbc.json file for the MirrorHeartbeatConnector.

      cd /tmp/kafka
      vim mm2-hbc.json
      
      • Update the following properties.

        • target.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmskdest
        • source.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmsksource
        • source.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.truststore.password: if you changed it
        • source.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • source.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.keystore.password: if you changed it
        • target.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)

        Note: The heartbeat interval is set to 20 seconds. If you wish to change that, you can update that setting.

MirrorSourceConnector
  • Create a new MirrorSourceConnector connector by submitting a request to the REST interface.

    cd /tmp/kafka
    curl -X PUT -H "Content-Type: application/json" --data @mm2-msc-cust-repl-policy.json http://localhost:8083/connectors/mm2-msc/config
    
    • The expected output of the command is.

    • Check the status of the connector.

      curl -s localhost:8083/connectors/mm2-msc/status | jq .
      
      • The expected output of the command is.

      Note: Even though the tasks.max configuration is set to 4, 3 tasks are started as there are 3 partitions in the topic.

MirrorCheckpointConnector
  • Create a new MirrorCheckpointConnector connector by submitting a request to the REST interface.

    cd /tmp/kafka
    curl -X PUT -H "Content-Type: application/json" --data @mm2-cpc-cust-repl-policy.json http://localhost:8083/connectors/mm2-cpc/config
    
    • The expected output of the command is.

    • Check the status of the connector.

      curl -s localhost:8083/connectors/mm2-cpc/status | jq .
      
      • The expected output of the command is.

MirrorHeartbeatConnector
  • Create a new MirrorHeartbeatConnector connector by submitting a request to the REST interface.

    cd /tmp/kafka
    curl -X PUT -H "Content-Type: application/json" --data @mm2-hbc.json http://localhost:8083/connectors/mm2-hbc/config
    
    • The expected output of the command is.

    • Check the status of the connector.

      curl -s localhost:8083/connectors/mm2-hbc/status | jq .
      
      • The expected output of the command is.

  • Observe the topics now in the source and destination Amazon MSK clusters. Run the following commands.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --list --zookeeper $zoomsksource
    

    Note: The source cluster has an additional topic called mm2-offset-syncs.mskdest.internal which was created by the MirrorSourceConnector and is used to sync offsets between the source and destination Amazon MSK clusters as described in KIP-382.

    bin/kafka-topics.sh --list --zookeeper $zoomskdest
    

    Note: The destination cluster has 3 additional topics now. The ExampleTopic which was created by the MirrorSourceConnector and is the replicated user topic and the heartbeats and msksource.checkpoints.internal topics which were created by the MirrorHeartbeatConnector and the MirrorCheckpointConnector connectors respectively which are MirrorMaker 2 internal topics as described in KIP-382. We will use the msksource.checkpoints.internal topic and the corresponding classes - Checkpoint, RemoteClusterUtils and MirrorClient to failover our consumer to the destination MSK cluster and seek to a mapped offset to resume the consumer after failover.

  • Go back to the browser window with the Grafana dashboard from the Monitoring section and hit refresh in the top right corner. You will now see data in the dashboard and you can see the consumer lag starting high but going to zero shortly as the MM2 consumers catch up in the graph in the top right.

    You now have a MirrorMaker 2 cluster running on Kafka Connect that is replicating topics, topic configurations, topic acls, groups and topic messages to the destination Amazon MSK cluster.


Using the DefaultReplicationPolicy
  • To read more about the connectors and what they do, see the Connectors section in KIP-382.

  • Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka dir and edit the connectors’ properties files to update the bootstrap servers. To download the properties files, click on mm2-msc, mm2-cpc, mm2-hbc.

    • Edit the mm2-msc.json file for the MirrorSourceConnector.

      cd /tmp/kafka
      vim mm2-msc.json
      
      • Update the following properties.

        • target.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmskdest
        • source.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmsksource
        • source.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.truststore.password: if you changed it
        • source.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • source.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.keystore.password: if you changed it
        • target.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)

        Note: If you want to specify any other consumer or producer configurations, prefix them with consumer. and producer. respectively. For example, consumer.auto.offset.reset.
        Note: The topics, topic configs, topic acls and consumer groups refresh intervals are set to 20 seconds. If you wish to change those, you can update those settings.

    • Edit the mm2-cpc.json file for the MirrorCheckpointConnector.

      cd /tmp/kafka
      vim mm2-cpc.json
      
      • Update the following properties.

        • target.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmskdest
        • source.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmsksource
        • source.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.truststore.password: if you changed it
        • source.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • source.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.keystore.password: if you changed it
        • target.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)

        Note: The checkpoint interval is set to 20 seconds. If you wish to change that, you can update that setting.

    • Edit the mm2-hbc.json file for the MirrorHeartbeatConnector.

      cd /tmp/kafka
      vim mm2-hbc.json
      
      • Update the following properties.

        • target.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmskdest
        • source.cluster.bootstrap.servers: To get the value, run the command echo $brokerstlsmsksource
        • source.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • source.cluster.ssl.truststore.password: if you changed it
        • source.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • source.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.key.password: password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar
        • target.cluster.ssl.keystore.password: if you changed it
        • target.cluster.ssl.truststore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)
        • target.cluster.ssl.keystore.location: if you changed it. (It is set to the default for AuthMSK-1.0-SNAPSHOT.jar)

        Note: The heartbeat interval is set to 20 seconds. If you wish to change that, you can update that setting.

MirrorSourceConnector
  • Create a new MirrorSourceConnector connector by submitting a request to the REST interface.

    cd /tmp/kafka
    curl -X PUT -H "Content-Type: application/json" --data @mm2-msc.json http://localhost:8083/connectors/mm2-msc/config
    
    • The expected output of the command is.

    • Check the status of the connector.

      curl -s localhost:8083/connectors/mm2-msc/status | jq .
      
      • The expected output of the command is.

      Note: Even though the tasks.max configuration is set to 4, 3 tasks are started as there are 3 partitions in the topic.

MirrorCheckpointConnector
  • Create a new MirrorCheckpointConnector connector by submitting a request to the REST interface.

    cd /tmp/kafka
    curl -X PUT -H "Content-Type: application/json" --data @mm2-cpc.json http://localhost:8083/connectors/mm2-cpc/config
    
    • The expected output of the command is.

    • Check the status of the connector.

      curl -s localhost:8083/connectors/mm2-cpc/status | jq .
      
      • The expected output of the command is.

MirrorHeartbeatConnector
  • Create a new MirrorHeartbeatConnector connector by submitting a request to the REST interface.

    cd /tmp/kafka
    curl -X PUT -H "Content-Type: application/json" --data @mm2-hbc.json http://localhost:8083/connectors/mm2-hbc/config
    
    • The expected output of the command is.

    • Check the status of the connector.

      curl -s localhost:8083/connectors/mm2-hbc/status | jq .
      
      • The expected output of the command is.

  • Observe the topics now in the source and destination Amazon MSK clusters. Run the following commands.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --list --zookeeper $zoomsksource
    

    Note: The source cluster has an additional topic called mm2-offset-syncs.mskdest.internal which was created by the MirrorSourceConnector and is used to sync offsets between the source and destination Amazon MSK clusters as described in KIP-382.

    bin/kafka-topics.sh --list --zookeeper $zoomskdest
    

    Note: The destination cluster has 3 additional topics now. The msksource.ExampleTopic which was created by the MirrorSourceConnector and is the replicated user topic and the heartbeats and msksource.checkpoints.internal topics which were created by the MirrorHeartbeatConnector and the MirrorCheckpointConnector connectors respectively which are MirrorMaker 2 internal topics as described in KIP-382. We will use the msksource.checkpoints.internal topic and the corresponding classes - Checkpoint, RemoteClusterUtils and MirrorClient to failover our consumer to the destination MSK cluster and seek to a mapped offset to resume the consumer after failover.

  • Go back to the browser window with the Grafana dashboard from the Monitoring section and hit refresh in the top right corner. You will now see data in the dashboard and you can see the consumer lag starting high but going to zero shortly as the MM2 consumers catch up in the graph in the top right.

    You now have a MirrorMaker 2 cluster running on Kafka Connect that is replicating topics, topic configurations, topic acls, groups and topic messages to the destination Amazon MSK cluster.


Start MM2 Consumer Group Offset Sync Application

Note: Do this step only if you’re using the consumer migration methdology with a background process to sync MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.
Note: To get the source code of the MM2 Consumer Group Offset Sync Application see github.

Using a custom ReplicationPolicy
Using the DefaultReplicationPolicy

Using a custom ReplicationPolicy
  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka dir

  • Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
    Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest

    cd /tmp/kafka
    cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_sync_dest
    vim /tmp/kafka/consumer.properties_sync_dest
    
  • Run the MM2 Consumer Group Offset Sync Application.
    Note: To kill the application process, note down the pid (process id) of the application process. Use kill <pid> to kill the process.

    java -jar /tmp/kafka/MM2GroupOffsetSync-1.0-SNAPSHOT.jar -cgi mm2TestConsumer1 -src msksource -pfp /tmp/kafka/consumer.properties_sync_dest -mtls -rpc com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy 2>&1 > /dev/null &
    
  • Tail the log file to check the sync process

    tail -f /tmp/MM2GroupOffsetSync.log
    

Using the DefaultReplicationPolicy
  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka dir

  • Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
    Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest

    cd /tmp/kafka
    cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_sync_dest
    vim /tmp/kafka/consumer.properties_sync_dest
    
  • Run the MM2 Consumer Group Offset Sync Application.
    Note: To kill the application process, note down the pid (process id) of the application process. Use kill <pid> to kill the process.

    java -jar /tmp/kafka/MM2GroupOffsetSync-1.0-SNAPSHOT.jar -cgi mm2TestConsumer1 -src msksource -pfp /tmp/kafka/consumer.properties_sync_dest -mtls 2>&1 > /dev/null &
    
  • Tail the log file to check the sync process

    tail -f /tmp/MM2GroupOffsetSync.log