Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.
Click on Open IDE (or go back to the browser tab if alreay open).
Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.
Go to the /tmp/kafka directory and edit the connect-distributed.properties file. Edit the following properties.
Note: To get the bootstrap servers, run the command echo $brokerstlsmskdest
. We’re pointing to the destination Amazon MSK cluster as that is the cluster we will be migrating to.
bootstrap.servers=<BootstrapBroker-String(TLS)>
Under Authentication settings for Connect workers
Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.
ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
Under Authentication settings for Connect producers used with source connectors
Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.
producer.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
producer.ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
Start the Kafka Connect service. It will use the ssl parameters from the /tmp/connect-distributed.properties file and connect to the Amazon MSK cluster using TLS mutual authentication.
sudo systemctl start kafka-connect.service
sudo systemctl status kafka-connect.service
This is the expected output from running these commands.
Make sure the Kafka Connect service started properly.
cat /tmp/kafka/kafka-connect.log|grep Herder
You should see an output similar to this.
Note: It might take a few seconds for it to start up. If you get an output which does not look like the picture below, give it a few seconds and retry the command.
Note: A Distributed herder coordinates with other workers to spread work across multiple processes. In the output you can see that the Herder started successfully and could connect to the Amazon MSK cluster using TLS mutual authentication and read the internal Kafka connect topics.
Kafka Connect has a REST Interface that supports a REST API for managing connectors. The default port for the service is 8083 and we did not change the port in the connect-distributed.properties file. On each of the Kafka Client Instances, the service is available at http://localhost:8083. Run the following command.
curl -X GET http://localhost:8083| jq .
Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.
Go to the /tmp/kafka directory and edit the connect-distributed.properties file. Edit the following properties.
Note: To get the bootstrap servers, run the command echo $brokerstlsmskdest
. We’re pointing to the destination Amazon MSK cluster as that is the cluster we will be migrating to.
bootstrap.servers=<BootstrapBroker-String(TLS)>
Under Authentication settings for Connect workers
Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.
ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
Under Authentication settings for Connect producers used with source connectors
Note: If you changed the locations of the keystore or truststore files or changed the password for the truststore file, you might have to change those in this file in addition to the below.
producer.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
producer.ssl.key.password=<password that you used for the keystore when running the authorization jar file AuthMSK-1.0-SNAPSHOT.jar>
Start the Kafka Connect service. It will use the ssl parameters from the /tmp/connect-distributed.properties file and connect to the Amazon MSK cluster using TLS mutual authentication.
sudo systemctl start kafka-connect.service
sudo systemctl status kafka-connect.service
This is the expected output from running these commands.
Make sure the Kafka Connect service started properly.
cat /tmp/kafka/kafka-connect.log|grep Herder
You should see an output similar to this.
Note: It might take a few seconds for it to start up. If you get an output which does not look like the picture below, give it a few seconds and retry the command.
Note: A Distributed herder coordinates with other workers to spread work across multiple processes. In the output you can see that the Herder started successfully and could connect to the Amazon MSK cluster using TLS mutual authentication and read the internal Kafka connect topics.
Kafka Connect has a REST Interface that supports a REST API for managing connectors. The default port for the service is 8083 and we did not change the port in the connect-distributed.properties file. On each of the Kafka Client Instances, the service is available at http://localhost:8083. Run the following command.
curl -X GET http://localhost:8083| jq .
Observe the topics curently in the source and destination Amazon MSK clusters. Run the following commands.
cd /home/ec2-user/kafka
bin/kafka-topics.sh --list --zookeeper $zoomsksource
Note: The source cluster has the ExampleTopic topic the producer is sending records to and __consumer_offsets.
bin/kafka-topics.sh --list --zookeeper $zoomskdest
Note: The destination cluster has 3 Kafka connect internal topics as well as the Schema Registry internal topic _schemas since we pointed both to the destination Amazon MSK cluster, in addition to __consumer_offsets.
You now have a Kafka Connect cluster running.
Using a custom ReplicationPolicy
Using the DefaultReplicationPolicy
To read more about the connectors and what they do, see the Connectors section in KIP-382.
Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.
Go to the /tmp/kafka dir and edit the connectors’ properties files to update the bootstrap servers. To download the properties files, click on mm2-msc-cust-repl-policy, mm2-cpc-cust-repl-policy, mm2-hbc.
Edit the mm2-msc-cust-repl-policy.json file for the MirrorSourceConnector.
Note: the custom ReplicationPolicy jar file (CustomMM2ReplicationPolicy-1.0-SNAPSHOT.jar) was copied to the /home/ec2-user/confluent/share/java/kafka dir by the CloudFormation template.
Note: To get the source code of the custom ReplicationPolicy see github.
cd /tmp/kafka
vim mm2-msc-cust-repl-policy.json
Update the following properties.
echo $brokerstlsmskdest
echo $brokerstlsmsksource
Note: The configuration "replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy" specifies the custom Replication policy.
Note: If you want to specify any other consumer or producer configurations, prefix them with consumer. and producer. respectively. For example, consumer.auto.offset.reset.
Note: The topics, topic configs, topic acls and consumer groups refresh intervals are set to 20 seconds. If you wish to change those, you can update those settings.
Edit the mm2-cpc-cust-repl-policy.json file for the MirrorCheckpointConnector.
Note: the custom ReplicationPolicy jar file (CustomMM2ReplicationPolicy-1.0-SNAPSHOT.jar) was copied to the /home/ec2-user/confluent/share/java/kafka dir by the CloudFormation template.
cd /tmp/kafka
vim mm2-cpc-cust-repl-policy.json
Update the following properties.
echo $brokerstlsmskdest
echo $brokerstlsmsksource
Note: The configuration "replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy" specifies the custom Replication policy.
Note: The checkpoint interval is set to 20 seconds. If you wish to change that, you can update that setting.
Edit the mm2-hbc.json file for the MirrorHeartbeatConnector.
cd /tmp/kafka
vim mm2-hbc.json
Update the following properties.
echo $brokerstlsmskdest
echo $brokerstlsmsksource
Note: The heartbeat interval is set to 20 seconds. If you wish to change that, you can update that setting.
Create a new MirrorSourceConnector connector by submitting a request to the REST interface.
cd /tmp/kafka
curl -X PUT -H "Content-Type: application/json" --data @mm2-msc-cust-repl-policy.json http://localhost:8083/connectors/mm2-msc/config
Check the status of the connector.
curl -s localhost:8083/connectors/mm2-msc/status | jq .
Note: Even though the tasks.max configuration is set to 4, 3 tasks are started as there are 3 partitions in the topic.
Create a new MirrorCheckpointConnector connector by submitting a request to the REST interface.
cd /tmp/kafka
curl -X PUT -H "Content-Type: application/json" --data @mm2-cpc-cust-repl-policy.json http://localhost:8083/connectors/mm2-cpc/config
Check the status of the connector.
curl -s localhost:8083/connectors/mm2-cpc/status | jq .
Create a new MirrorHeartbeatConnector connector by submitting a request to the REST interface.
cd /tmp/kafka
curl -X PUT -H "Content-Type: application/json" --data @mm2-hbc.json http://localhost:8083/connectors/mm2-hbc/config
Check the status of the connector.
curl -s localhost:8083/connectors/mm2-hbc/status | jq .
Observe the topics now in the source and destination Amazon MSK clusters. Run the following commands.
cd /home/ec2-user/kafka
bin/kafka-topics.sh --list --zookeeper $zoomsksource
Note: The source cluster has an additional topic called mm2-offset-syncs.mskdest.internal which was created by the MirrorSourceConnector and is used to sync offsets between the source and destination Amazon MSK clusters as described in KIP-382.
bin/kafka-topics.sh --list --zookeeper $zoomskdest
Note: The destination cluster has 3 additional topics now. The ExampleTopic which was created by the MirrorSourceConnector and is the replicated user topic and the heartbeats and msksource.checkpoints.internal topics which were created by the MirrorHeartbeatConnector and the MirrorCheckpointConnector connectors respectively which are MirrorMaker 2 internal topics as described in KIP-382. We will use the msksource.checkpoints.internal topic and the corresponding classes - Checkpoint, RemoteClusterUtils and MirrorClient to failover our consumer to the destination MSK cluster and seek to a mapped offset to resume the consumer after failover.
Go back to the browser window with the Grafana dashboard from the Monitoring section and hit refresh in the top right corner. You will now see data in the dashboard and you can see the consumer lag starting high but going to zero shortly as the MM2 consumers catch up in the graph in the top right.
You now have a MirrorMaker 2 cluster running on Kafka Connect that is replicating topics, topic configurations, topic acls, groups and topic messages to the destination Amazon MSK cluster.
To read more about the connectors and what they do, see the Connectors section in KIP-382.
Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.
Go to the /tmp/kafka dir and edit the connectors’ properties files to update the bootstrap servers. To download the properties files, click on mm2-msc, mm2-cpc, mm2-hbc.
Edit the mm2-msc.json file for the MirrorSourceConnector.
cd /tmp/kafka
vim mm2-msc.json
Update the following properties.
echo $brokerstlsmskdest
echo $brokerstlsmsksource
Note: If you want to specify any other consumer or producer configurations, prefix them with consumer. and producer. respectively. For example, consumer.auto.offset.reset.
Note: The topics, topic configs, topic acls and consumer groups refresh intervals are set to 20 seconds. If you wish to change those, you can update those settings.
Edit the mm2-cpc.json file for the MirrorCheckpointConnector.
cd /tmp/kafka
vim mm2-cpc.json
Update the following properties.
echo $brokerstlsmskdest
echo $brokerstlsmsksource
Note: The checkpoint interval is set to 20 seconds. If you wish to change that, you can update that setting.
Edit the mm2-hbc.json file for the MirrorHeartbeatConnector.
cd /tmp/kafka
vim mm2-hbc.json
Update the following properties.
echo $brokerstlsmskdest
echo $brokerstlsmsksource
Note: The heartbeat interval is set to 20 seconds. If you wish to change that, you can update that setting.
Create a new MirrorSourceConnector connector by submitting a request to the REST interface.
cd /tmp/kafka
curl -X PUT -H "Content-Type: application/json" --data @mm2-msc.json http://localhost:8083/connectors/mm2-msc/config
Check the status of the connector.
curl -s localhost:8083/connectors/mm2-msc/status | jq .
Note: Even though the tasks.max configuration is set to 4, 3 tasks are started as there are 3 partitions in the topic.
Create a new MirrorCheckpointConnector connector by submitting a request to the REST interface.
cd /tmp/kafka
curl -X PUT -H "Content-Type: application/json" --data @mm2-cpc.json http://localhost:8083/connectors/mm2-cpc/config
Check the status of the connector.
curl -s localhost:8083/connectors/mm2-cpc/status | jq .
Create a new MirrorHeartbeatConnector connector by submitting a request to the REST interface.
cd /tmp/kafka
curl -X PUT -H "Content-Type: application/json" --data @mm2-hbc.json http://localhost:8083/connectors/mm2-hbc/config
Check the status of the connector.
curl -s localhost:8083/connectors/mm2-hbc/status | jq .
Observe the topics now in the source and destination Amazon MSK clusters. Run the following commands.
cd /home/ec2-user/kafka
bin/kafka-topics.sh --list --zookeeper $zoomsksource
Note: The source cluster has an additional topic called mm2-offset-syncs.mskdest.internal which was created by the MirrorSourceConnector and is used to sync offsets between the source and destination Amazon MSK clusters as described in KIP-382.
bin/kafka-topics.sh --list --zookeeper $zoomskdest
Note: The destination cluster has 3 additional topics now. The msksource.ExampleTopic which was created by the MirrorSourceConnector and is the replicated user topic and the heartbeats and msksource.checkpoints.internal topics which were created by the MirrorHeartbeatConnector and the MirrorCheckpointConnector connectors respectively which are MirrorMaker 2 internal topics as described in KIP-382. We will use the msksource.checkpoints.internal topic and the corresponding classes - Checkpoint, RemoteClusterUtils and MirrorClient to failover our consumer to the destination MSK cluster and seek to a mapped offset to resume the consumer after failover.
Go back to the browser window with the Grafana dashboard from the Monitoring section and hit refresh in the top right corner. You will now see data in the dashboard and you can see the consumer lag starting high but going to zero shortly as the MM2 consumers catch up in the graph in the top right.
You now have a MirrorMaker 2 cluster running on Kafka Connect that is replicating topics, topic configurations, topic acls, groups and topic messages to the destination Amazon MSK cluster.
Note: Do this step only if you’re using the consumer migration methdology with a background process to sync MM2 checkpointed offsets to the __consumer_offsets internal topic at the destination.
Note: To get the source code of the MM2 Consumer Group Offset Sync Application see github.
Using a custom ReplicationPolicy
Using the DefaultReplicationPolicy
Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.
Go to the /tmp/kafka dir
Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest
cd /tmp/kafka
cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_sync_dest
vim /tmp/kafka/consumer.properties_sync_dest
Run the MM2 Consumer Group Offset Sync Application.
Note: To kill the application process, note down the pid (process id) of the application process. Use kill <pid>
to kill the process.
java -jar /tmp/kafka/MM2GroupOffsetSync-1.0-SNAPSHOT.jar -cgi mm2TestConsumer1 -src msksource -pfp /tmp/kafka/consumer.properties_sync_dest -mtls -rpc com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy 2>&1 > /dev/null &
Tail the log file to check the sync process
tail -f /tmp/MM2GroupOffsetSync.log
Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.
Go to the /tmp/kafka dir
Make a copy of the /tmp/kafka/consumer.properties file and update BOOTSTRAP_SERVERS_CONFIG to point to the destination Amazon MSK cluster.
Note: To get the destination Amazon MSK cluster’s bootstrap servers, run the command echo $brokerstlsmskdest
cd /tmp/kafka
cp /tmp/kafka/consumer.properties /tmp/kafka/consumer.properties_sync_dest
vim /tmp/kafka/consumer.properties_sync_dest
Run the MM2 Consumer Group Offset Sync Application.
Note: To kill the application process, note down the pid (process id) of the application process. Use kill <pid>
to kill the process.
java -jar /tmp/kafka/MM2GroupOffsetSync-1.0-SNAPSHOT.jar -cgi mm2TestConsumer1 -src msksource -pfp /tmp/kafka/consumer.properties_sync_dest -mtls 2>&1 > /dev/null &
Tail the log file to check the sync process
tail -f /tmp/MM2GroupOffsetSync.log