Setup pre-migration environment

Steps we will perform in this section

Simulate pre-migration producer and consumer

In this section of the lab, we will be simulating a pre-migration environment with an existing Apache Kafka source cluster (in this lab, this will be an AWS MSK cluster, but it doesn’t have to be). This source cluster will have one topic (ExampleTopic), one producer producing to this topic, and a consumer reading from this topic.

Start a producer against the source Amazon MSK cluster

  • In the Cloud9 terminal on the left, let’s setup environment variables and connect to KafkaClientInstance1

    export MSK_STACK=MSKClients
    export ssh_cmd1=$(aws cloudformation describe-stacks --stack-name $MSK_STACK --query 'Stacks[0].Outputs[?OutputKey==`SSHKafkaClientEC2Instance1`].OutputValue' --output text)
    $ssh_cmd1
    

Answer yes to “Are you sure you want to continue connecting (yes/no)” and you should be connected to KafkaClientInstance1

  • Goto the /tmp/kafka directory and take a quick look where configuration files and scripts have already been downloaded to this EC2 instance.

    cd /tmp/kafka
    ls
    

    We will be using many of these files in the following steps of this lab.

  • Set your MSK_STACK variable

If you are running this lab in your own environment, MSK_STACK will be “MSKMM” and if you are running this via Event Engine, MSK_STACK will be your unique Stack name obtained previously in Required Event Engine Variables)

export MSK_STACK=MSKMM # IMPORTANT: If running in EventEngine, set MSK_STACK to your unique Event Engine Stack Name, such as mod-653f2c79d52f4b01 instead of MSKMM
  • Run Environment Variable setup script

We use various environment variables in this lab, so let’s run a Python script is run to make things more convenient. The “setup-env.py” script will a shell script which we run to setup some variables used throughout the rest of this lab. We will verify the variables are set correctly with the final “echo” statement.

export region=$(curl http://169.254.169.254/latest/meta-data/placement/region)
python3 setup-env.py --stackName $MSK_STACK --region $region
. ./setup_env
echo $brokersmsksource

You should see a value from the last echo command. Make sure you see a value for $brokersmsksource before proceeding.

  • Go to the /home/ec2-user/kafka dir and Create a Topic.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoomsksource --replication-factor 3 --partitions 3 --topic ExampleTopic
    
  • Setup the Producer settings (including Glue Schema Registry and Monitoring)

    cd /tmp/kafka
    cp producer.properties_msk producer.properties_msk_dest
    sed -i -e "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=$brokersmsksource/g" producer.properties_msk
    
    export schema_compatibility=FULL_ALL
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3800:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    
  • Run the Producer and confirm the log output

    java $EXTRA_ARGS -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 10800 -nle -gsr -gsrr $region -gar -gcs $schema_compatibility > /tmp/producer.log 2>&1 &
    

    To read more about the producer and the parameters and properties it supports, see Producer. The producer will run in the background for 3 hours.
    Note: To kill the producer before the 3 hours are up, note down the pid (process id) of the producer process. Use kill <pid> to kill the process.
    Note: The javaagent is a Prometheus JMX exporter that will export the Apacke Kafka producer metrics for Prometheus to scrape and store for queries and visualization.
    Note: To get the source code of the producer see github.

    • The output of the previous command should like this.


  • Tail the producer log file to see the producer sending events. Ctrl-C to exit from the tail command.

    tail -f /tmp/producer.log
    
  • [Optional] Check to see if the schema got registered in Schema Registry.

    • In the AWS Glue Console, under Schema registries > Schemas, you should see the ExampleTopic registered as the Producer was run with Glue Schema Registry args: -gsr -gsrr $region -gar -gcs $schema_compatibility

Run a Consumer

Now, let’s configure and run a Kafka Consumer application reading from the previously created ExampleTopic.

  • Setup Consumer environment settings

    cd /tmp/kafka
    cp consumer.properties consumer.properties_dest
    sed -i -e "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=$brokersmsksource/g" consumer.properties
    export EXTRA_ARGS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3900:/home/ec2-user/prometheus/kafka-producer-consumer.yml
    
  • Run the Clickstream consumer.

The consumer will run in the background for 3 hours.
Note: To kill the consumer before the 3 hours are up, note down the pid (process id) of the consumer process. Use kill <pid> to kill the process.
Note: The javaagent is a Prometheus JMX exporter that will export the Apacke Kafka consumer metrics for Prometheus to scrape and store for queries and visualization.
Note: To get the source code of the consumer see github.

java $EXTRA_ARGS -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/consumer.properties -nt 3 -rf 10800 -src msksource -gsr -gsrr $region  > /tmp/consumer.log 2>&1 &
  • The output of the previous command should like this.

  • Tail the consumer log file to see the consumer reading messages and making progress. Ctrl-C to exit from the tail command.

    tail -f /tmp/consumer.log