We will be starting a clickstream producer here and in the next section, we will have a Lambda function consuming these events from the MSK cluster.
Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.
Click on Open IDE (or go back to the browser tab if already open).
Go to the terminal.
Go to the /home/ec2-user/kafka dir and Create a Topic.
cd /home/ec2-user/kafka bin/kafka-topics.sh --create --zookeeper $zoo --replication-factor 3 --partitions 3 --topic ExampleTopic
Go to the /tmp/kafka dir and update the producer.properties_msk file to include the bootstarp brokers string and the schema registry url.
Note: Change the values of MSK_STACK if you changed them from the default values.
cd /tmp/kafka export MSK_STACK=MSK sed -i -e "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=$brokers/g" producer.properties_msk
Run the Clickstream producer. The producer uses the AWS Glue Schema Registry to serialize messages in Avro. Although parameters can be specified for the producer to use a specific registry and pre-created schema, in this lab, the producer is using the default schema registry (named default-registry) with auto registration of schemas. To read more about the producer and the parameters and properties it supports, see Producer. The producer will run in the background for 3 hours.
Note: To kill the producer before the 3 hours are up, note down the pid (process id) of the producer process. Use
kill <pid> to kill the process.
Note: To get the source code of the producer see github.
export region=$(curl http://169.254.169.254/latest/meta-data/placement/region) export schema_compatibility=FULL_ALL java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 10800 -nle -gsr -gsrr $region -gar -gcs $schema_compatibility > /tmp/producer.log 2>&1 &
Tail the producer log file to see the producer sending events. Ctrl-C to exit from the tail command.
tail -f /tmp/producer.log
Go to the AWS Glue console.
In the left pane, click on Schema registries.
Click on default-registry.
You can see the schema registered. By default, when no schema name is provided, it is inferred as the topic name. In this case, it is ExampleTopic.
Click on the schame name ExampleTopic. You can see the schema details and the auto registered version.
Click on the schema version. You can see the schema version details.