Setup pre-migration environment

Steps we will perform in this section

Simulate pre-migration producer and consumer

We will be simulating a pre-migration environment with an existing source Amazon MSK cluster, one topic (ExampleTopic), one producer producing to the topic and a consumer reading from the topic.

Start a producer against the source Amazon MSK cluster

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).


  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /home/ec2-user/kafka dir and Create a Topic.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoomsksource --replication-factor 3 --partitions 3 --topic ExampleTopic
    
  • Go to the /tmp/kafka dir and update the producer.properties_msk file.

    cd /tmp/kafka
    
    • Edit the producer.properties_msk file and make sure it has the following properties.
      Note: To get the SchemaRegistryUrl, go to the AWS CloudFormation Console, click on the stack you created in the Setup section (not the Nested stacks). Go to the Outputs tab and copy the Value next to the key SchemaRegistryKafkaClientEC2Instance1Url.
      To get the bootstrap servers, run the command echo $brokerstlsmsksource. We’re pointing to the source Amazon MSK cluster as we’re simulating a pre-migration environment.

      BOOTSTRAP_SERVERS_CONFIG=<BootstrapBroker-String(TLS)>
      SCHEMA_REGISTRY_URL_CONFIG=<SchemaRegistryUrl>
      SSL_KEYSTORE_PASSWORD_CONFIG=<password that you used for the keystore when running the authorization jar file>
      SSL_KEY_PASSWORD_CONFIG=<password that you used for the keystore when running the authorization jar file>
      
  • Run the Clickstream producer. To read more about the producer and the parameters and properties it supports, see Producer. The producer will run in the background for 3 hours.
    Note: To kill the producer before the 3 hours are up, note down the pid (process id) of the producer process. Use kill <pid> to kill the process.
    Note: The javaagent is a Prometheus JMX exporter that will export the Apacke Kafka producer metrics for Prometheus to scrape and store for queries and visualization.
    Note: To get the source code of the producer see github.

    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3800:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 10800 -mtls -nle > /tmp/producer.log 2>&1 &
    
    • The output of the previous command should like this.


  • Tail the producer log file to see the producer sending events. Ctrl-C to exit from the tail command.

    tail -f /tmp/producer.log
    
  • Check to see if the schema got registered in Schema Registry.

    export hostname=`hostname`
    curl -X GET http://${hostname}:8081/subjects/ExampleTopic-value/versions/1
    
    • This is the expected output from running this command.


Start a consumer against the source Amazon MSK cluster

  • Go to the /tmp/kafka dir and update the consumer.properties file.

    cd /tmp/kafka
    
    • Edit the consumer.properties file and make sure it has the following properties.
      Note: To get the SchemaRegistryUrl, go to the AWS CloudFormation Console, click on the stack you created in the Setup section (not the Nested stacks). Go to the Outputs tab and copy the Value next to the key SchemaRegistryKafkaClientEC2Instance1Url.
      To get the bootstrap servers, run the command echo $brokerstlsmsksource. We’re pointing to the source Amazon MSK cluster as we’re simulating a pre-migration environment.

      BOOTSTRAP_SERVERS_CONFIG=<BootstrapBroker-String(TLS)>
      SCHEMA_REGISTRY_URL_CONFIG=<SchemaRegistryUrl>
      SSL_KEYSTORE_PASSWORD_CONFIG=<password that you used for the keystore when running the authorization jar file>
      SSL_KEY_PASSWORD_CONFIG=<password that you used for the keystore when running the authorization jar file>
      
  • Run the Clickstream consumer. The consumer will run in the background for 3 hours.
    Note: To kill the consumer before the 3 hours are up, note down the pid (process id) of the consumer process. Use kill <pid> to kill the process.
    Note: The javaagent is a Prometheus JMX exporter that will export the Apacke Kafka consumer metrics for Prometheus to scrape and store for queries and visualization.
    Note: To get the source code of the consumer see github.

    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties -nt 3 -rf 10800 -mtls -src msksource > /tmp/consumer.log 2>&1 &
    
    • The output of the previous command should like this.


  • Tail the consumer log file to see the consumer reading messages and making progress. Ctrl-C to exit from the tail command.

    tail -f /tmp/consumer.log