Run Producer

Start producing Clickstream data into the Amazon MSK cluster

  • Go to the Cloudformation console, click on the stack you just created, click on the Outputs tab. Then scroll down and copy the value for the key KafkaClientEC2InstanceSsh.

  • SSH into the KafkaClientEC2Instance instance. Open a terminal on Mac or a putty terminal on Windows and paste the command you copied from the Outputs section above.

  • Go to /tmp/kafka directory and update the producer.properties_msk and schema-registry.properties files.

    • producer.properties_msk:

      • BOOTSTRAP_SERVERS_CONFIG = Bootstrap servers value that you copied in the Setup section
      • SCHEMA_REGISTRY_URL_CONFIG = from the CloudFormation Outputs section copy the value of the key SchemaRegistryPrivateDNS
      cd /tmp/kafka
      nano producer.properties_msk
      
    • schema-registry.properties:

      • kafkastore.bootstrap.servers = Bootstrap servers value that you copied in the Setup section
      cd /tmp/kafka
      nano schema-registry.properties
      
  • Start Schema Registry service

    sudo systemctl start confluent-schema-registry
    sudo systemctl status confluent-schema-registry
    
  • This is the expected output from running these commands.


  • Create the ExampleTopic topic in the MSK Kafka cluster.

    • Parameters:
      • –topic: The topic in the Amazon MSK cluster to produce messages to.
      • –zookeeper: The zookeeper connect string for the Amazon MSK cluster.
      • –replication-factor 3: The replication factor for the topic. Here we have specified 3 replicas, so there will be a replica on each of the 3 Amazon MSK Kafka broker nodes in the MSK cluster.
      • –partitions 3: The number of partitions for the Topic. Here we have specified 3, so there will be a partition on each of the 3 Amazon MSK Kafka broker nodes in the MSK cluster.
    /home/ec2-user/kafka/bin/kafka-topics.sh --create --zookeeper <zookeeper connect string you copied in the Setup** section> --replication-factor 3 --partitions 3 --topic ExampleTopic
    
  • Create the output topics in the MSK Kafka cluster where the Kinesis Data Analytics Apache Flink application would send the clickstream analytics to.

    /home/ec2-user/kafka/bin/kafka-topics.sh --create --zookeeper <zookeeper connect string you copied in the Setup section> --replication-factor 3 --partitions 3 --topic Departments_Agg
    
    /home/ec2-user/kafka/bin/kafka-topics.sh --create --zookeeper <zookeeper connect string you copied in the Setup section> --replication-factor 3 --partitions 3 --topic ClickEvents_UserId_Agg_Result
    
    /home/ec2-user/kafka/bin/kafka-topics.sh --create --zookeeper <zookeeper connect string you copied in the Setup section> --replication-factor 3 --partitions 3 --topic User_Sessions_Aggregates_With_Order_Checkout
    
  • Run the KafkaClickstreamClient-1.0-SNAPSHOT.jar program. This is a mock Clickstream producer. Please read more about the producer at Producer.

    Parameters

    • -rf: Run for (in seconds)
    • -pfp: Properties file location that has the urls of the Kafka brokers and the Schema Registry
    • -nt: Number of threads to run
    cd /tmp/kafka
    java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 1800 
    

    This command will run the producer for 30 minutes.