Start Producer

Steps we will perform in this section

Start a producer

We will be starting a clickstream producer here and in the next section, we will have a Lambda function consuming these events from the MSK cluster.

Start a producer against the Amazon MSK cluster

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).


  • Go to the terminal.

  • Go to the /home/ec2-user/kafka dir and Create a Topic.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoo --replication-factor 3 --partitions 3 --topic ExampleTopic
    
  • Go to the /tmp/kafka dir and update the producer.properties_msk file to include the bootstarp brokers string and the schema registry url.
    Note: Change the values of MSK_STACK if you changed them from the default values.

    cd /tmp/kafka
    export MSK_STACK=MSK
    SCHEMA_REGISTRY_URL=$(aws2 cloudformation describe-stacks --stack-name $MSK_STACK --query 'Stacks[0].Outputs[?OutputKey==`SchemaRegistryUrl`].OutputValue' --output text)
    export SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL////\\/}
    sed -i -e "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=$brokers/g" producer.properties_msk
    sed -i -e "s/SCHEMA_REGISTRY_URL_CONFIG=/SCHEMA_REGISTRY_URL_CONFIG=$SCHEMA_REGISTRY_URL/g" producer.properties_msk
    
  • Run the Clickstream producer. To read more about the producer and the parameters and properties it supports, see Producer. The producer will run in the background for 3 hours.
    Note: To kill the producer before the 3 hours are up, note down the pid (process id) of the producer process. Use kill <pid> to kill the process.
    Note: To get the source code of the producer see github.

    java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 10800 -nle > /tmp/producer.log 2>&1 &
    
    • The output of the previous command should like this.


  • Tail the producer log file to see the producer sending events. Ctrl-C to exit from the tail command.

    tail -f /tmp/producer.log
    

Check to see if the schema got registered in Schema Registry.

  • Run the following commands.

    export hostname=`hostname`
    curl -X GET http://${hostname}:8081/subjects/ExampleTopic-value/versions/1
    
    • This is the expected output from running this command.