Start Producer

Steps we will perform in this section

Start a producer

We will be starting a clickstream producer here and in the next section, we will have a Lambda function consuming these events from the MSK cluster.

Start a producer against the Amazon MSK cluster

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).

  • Go to the terminal.

  • Go to the /home/ec2-user/kafka dir and Create a Topic.

    cd /home/ec2-user/kafka
    bin/ --create --zookeeper $zoo --replication-factor 3 --partitions 3 --topic ExampleTopic
  • Go to the /tmp/kafka dir and update the producer.properties_msk file to include the bootstarp brokers string and the schema registry url.
    Note: Change the values of MSK_STACK if you changed them from the default values.

    cd /tmp/kafka
    export MSK_STACK=MSK
    SCHEMA_REGISTRY_URL=$(aws2 cloudformation describe-stacks --stack-name $MSK_STACK --query 'Stacks[0].Outputs[?OutputKey==`SchemaRegistryUrl`].OutputValue' --output text)
    sed -i -e "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=$brokers/g" producer.properties_msk
  • Run the Clickstream producer. To read more about the producer and the parameters and properties it supports, see Producer. The producer will run in the background for 3 hours.
    Note: To kill the producer before the 3 hours are up, note down the pid (process id) of the producer process. Use kill <pid> to kill the process.
    Note: To get the source code of the producer see github.

    java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 10800 -nle > /tmp/producer.log 2>&1 &
    • The output of the previous command should like this.

  • Tail the producer log file to see the producer sending events. Ctrl-C to exit from the tail command.

    tail -f /tmp/producer.log

Check to see if the schema got registered in Schema Registry.

  • Run the following commands.

    export hostname=`hostname`
    curl -X GET http://${hostname}:8081/subjects/ExampleTopic-value/versions/1
    • This is the expected output from running this command.