Producers, Consumers and Schema Registry

Apache Kafka console producer and consumer

  • Go to the /tmp directory, create a client.properties file and put the following in it.
    Note: Here I have used bob as an example but change it based on the secret that you created.

    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="nancy" \
      password="nancy-secret";
    
  • Go to the /home/ec2-user/kafka dir and run the following command to start a console producer with TLS for encryption in-transit and SASL/SCRAM authentication. When you get the > prompt, type in some messages, pressing enter between messages. Cntrl-C to exit.

    cd /home/ec2-user/kafka
    bin/kafka-console-producer.sh --broker-list $brokerssaslscram --topic test --producer.config /tmp/client.properties
    
  • The output should look like this.


  • Try doing the same against the testaclfail topic. Recall that the Distinguished Name of the client certificate installed in the keystore was not given write access to the testaclfail topic. It should be unable to send messages. Cntrl-C to exit.
    Note: You will still get the initial > prompt. Type in a message and press enter. Then the error should appear.

    cd /home/ec2-user/kafka
    bin/kafka-console-producer.sh --broker-list $brokerssaslscram --topic testaclfail --producer.config /tmp/client.properties_tls
    
  • The output should look like this.


  • Run the following command to start a console consumer with TLS for encryption in-transit and SASL/SCRAM authentication. Cntrl-C to exit.

    bin/kafka-console-consumer.sh --bootstrap-server $brokerssaslscram --topic test --consumer.config /tmp/client.properties_tls --from-beginning
    
  • The output should look like this.



Using a Java Producer

  • Go to the /home/ec2-user/kafka dir and Create a Topic.

    cd /home/ec2-user/kafka
    bin/kafka-topics.sh --create --zookeeper $zoo --replication-factor 3 --partitions 1 --topic ExampleTopic
    
  • Go to /tmp/kafka directory and update the producer.properties_msk file.

    cd /tmp/kafka
    export MSK_STACK=MSK
    SCHEMA_REGISTRY_URL=$(aws2 cloudformation describe-stacks --stack-name $MSK_STACK --query 'Stacks[0].Outputs[?OutputKey==`SchemaRegistryUrl`].OutputValue' --output text)
    export SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL////\\/}
    sed -i -e "s/BOOTSTRAP_SERVERS_CONFIG=/BOOTSTRAP_SERVERS_CONFIG=$brokerssaslscram/g" producer.properties_msk
    sed -i -e "s/SCHEMA_REGISTRY_URL_CONFIG=/SCHEMA_REGISTRY_URL_CONFIG=$SCHEMA_REGISTRY_URL/g" producer.properties_msk
    
  • Go to the /tmp/kafka directory and update the schema-registry.properties file. Run the following commands.

    cd /tmp/kafka
    sed -i -e "s/kafkastore.bootstrap.servers=/kafkastore.bootstrap.servers=$brokerssaslscram/g" schema-registry.properties
    echo -n "kafkastore.security.protocol=SASL_SSL
    kafkastore.sasl.mechanism=SCRAM-SHA-512
    kafkastore.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\
      username="alice" \\
      password="alice-secret";
    " >> schema-registry.properties
    
  • Start the Schema Registry service. It will use the ssl parameters from the /tmp/schema-registry.properties file and connect to the Amazon MSK cluster using TLS mutual authentication. The schema registry is needed as the Java producer uses it to send Avro encoded messages to Amazon MSK.

    sudo systemctl start confluent-schema-registry
    sudo systemctl status confluent-schema-registry
    
    • This is the expected output from running these commands.


    • Check to make sure the Schema Registry started successfully

      cat /tmp/kafka/schema-registry.log|grep "Server started"
      
    • This is the expected output from running this command.


  • Run the Clickstream producer. To read more about the producer and the parameters and properties it supports, see Producer. The producer will run for 60 seconds.

    java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ExampleTopic -pfp /tmp/kafka/producer.properties_msk -nt 8 -rf 60 -nle -mtls -sse -ssu nancy
    
  • Check to see if the schema got registered in Schema Registry.

    export hostname=`hostname`
    curl -X GET http://${hostname}:8081/subjects/ExampleTopic-value/versions/1
    
    • This is the expected output from running this command.