Producers, Consumers and Schema Registry

Apache Kafka console producer and consumer

  • Go to the /tmp directory, create a client.properties_tls file and put the following in it.

    security.protocol=SSL
    ssl.truststore.location=/tmp/kafka.client.truststore.jks
    ssl.keystore.location=/tmp/kafka.client.keystore.jks
    ssl.keystore.password=<password that you used for the keystore when running the authorization jar file>
    ssl.key.password=<password that you used for the keystore when running the authorization jar file>
    ssl.truststore.password=<truststore password, default = changeit>
    
  • Go to the /home/ec2-user/kafka dir and run the following command to start a console producer with TLS for encryption in-transit and TLS mutual authentication. When you get the > prompt, type in some messages, pressing enter between messages. Cntrl-C to exit.

    cd /home/ec2-user/kafka
    bin/kafka-console-producer.sh --broker-list $brokerstls --topic test --producer.config /tmp/client.properties_tls
    
  • The output should look like this.


  • Try doing the same against the testaclfail topic. Recall that the Distinguished Name of the client certificate installed in the keystore was not given write access to the testaclfail topic. It should be unable to send messages. Cntrl-C to exit.
    Note: You will still get the initial > prompt. Type in a message and press enter. Then the error should appear.

    cd /home/ec2-user/kafka
    bin/kafka-console-producer.sh --broker-list $brokerstls --topic testaclfail --producer.config /tmp/client.properties_tls
    
  • The output should look like this.


  • Run the following command to start a console consumer with TLS for encryption in-transit and TLS mutual authentication. Cntrl-C to exit.

    bin/kafka-console-consumer.sh --bootstrap-server $brokerstls --topic test --consumer.config /tmp/client.properties_tls --from-beginning
    
  • The output should look like this.



Using a Java Producer

  • Go to the /home/ec2-user/kafka dir and Create a Topic.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoo --replication-factor 3 --partitions 1 --topic ExampleTopic
    
  • Go to /tmp/kafka directory and update the properties files.

    cd /tmp/kafka
    
    • Edit the producer.properties_msk file and make sure it has the following properties. Note: To get the SchemaRegistryUrl, go to the AWS CloudFormation Console, click on the stack you created in the Setup section (not the Nested stacks). Go to the Outputs tab and copy the Value next to the key SchemaRegistryUrl.

      BOOTSTRAP_SERVERS_CONFIG=<BootstrapBroker-String(TLS)>
      SCHEMA_REGISTRY_URL_CONFIG=<SchemaRegistryUrl>
      SSL_KEYSTORE_PASSWORD_CONFIG=<password that you used for the keystore when running the authorization jar file>
      SSL_KEY_PASSWORD_CONFIG=<password that you used for the keystore when running the authorization jar file>
      
    • Edit the schema-registry.properties file and make sure it has the following properties.
      Note: To get the hostname of the EC2 instance, run the command hostname on the linux prompt.

      host.name=<hostname of the EC2 instance>
      kafkastore.bootstrap.servers=<BootstrapBroker-String(TLS)>
      kafkastore.security.protocol=SSL (make sure it is uncommented - remove the "#")
      kafkastore.ssl.truststore.location=/tmp/kafka.client.truststore.jks (make sure it is uncommented - remove the "#")
      kafkastore.ssl.keystore.location=/tmp/kafka.client.keystore.jks (make sure it is uncommented - remove the "#")
      kafkastore.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file> (make sure it is uncommented - remove the "#")
      kafkastore.ssl.key.password=<password that you used for the keystore when running the authorization jar file> (make sure it is uncommented - remove the "#")
      kafkastore.ssl.truststore.password=<truststore password, default = changeit>(make sure it is uncommented - remove the "#")
      
    • Start the Schema Registry service. It will use the ssl parameters from the /tmp/schema-registry.properties file and connect to the Amazon MSK cluster using TLS mutual authentication. The schema registry is needed as the Java producer uses it to send Avro encoded messages to Amazon MSK.

      sudo systemctl start confluent-schema-registry
      sudo systemctl status confluent-schema-registry
      
    • This is the expected output from running these commands.


    • Check to make sure the Schema Registry started successfully

      cat /tmp/kafka/schema-registry.log|grep "Server started"
      
    • This is the expected output from running this command.


  • Run the Clickstream producer. To read more about the producer and the parameters and properties it supports, see Producer. The producer will run for 60 seconds.

    java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 60 -mtls
    
  • Check to see if the schema got registered in Schema Registry.

    export hostname=`hostname`
    curl -X GET http://${hostname}:8081/subjects/ExampleTopic-value/versions/1
    
    • This is the expected output from running this command.