Apache Kafka has a pluggable authorizer and ships with an out-of-box authorizer implementation that uses zookeeper to store all acls. To learn more please reference Apache Kafka Authorization and ACLs. In Amazon MSK, this authorizer is enabled in the file on the brokers. Kafka acls are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. This behavior can be changed by setting the property “”. In Amazon MSK, this property is set by default which means that if no acls are set on a resource, it is accessible by all principals. However, if you do enable acls on a resource, only the authorized principals are allowed. If you want to restrict access to a topic and authorize a client using SASL/SCRAM authentication, you need to add acls using the Kafka authorizer CLI.

As mentioned before, we will deem user alice the administrator who will have the ability to create or delete topics, create or delete Topic ACLs (give permissions to other users) but no permissions to read or write to any topic (although since user alice has the ability to create topic ACLs, she can grant herself that permission) and user nancy will have the ability to read and write to topic test but no read or write permissions on topic testfail.

  • Setup client properties files for users alice and nancy

    echo -n "security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512 required \\
      username="alice" \\
    " > /tmp/client.properties_alice
    echo -n "security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512 required \\
      username="nancy" \\
    " > /tmp/client.properties_nancy
  • Go to the /home/ec2-user/kafka dir and create a couple of topics. The topic test will be used to test successfully accessing a topic with the right ACL authorization while the topic testaclfail will be used to test failure in accessing a topic without ACL authorization.

    cd /home/ec2-user/kafka
    bin/ --create --bootstrap-server $brokerssaslscram --replication-factor 3 --partitions 1 --topic test --command-config /tmp/client.properties_alice
    bin/ --create --bootstrap-server $brokerssaslscram --replication-factor 3 --partitions 1 --topic testaclfail --command-config /tmp/client.properties_alice
  • Setup user alice to have permissions to create or delete topics, create or delete Topic ACLs

    cd /home/ec2-user/kafka
    export dn="User:${cn}"
    bin/ --bootstrap-server $brokerssaslscram --add --allow-principal $dn --operation Create --operation Alter --cluster --command-config /tmp/client.properties_alice
    bin/ --bootstrap-server $brokerssaslscram --add --allow-principal $dn --operation Create --operation Delete --topic * --command-config /tmp/client.properties_alice
    • This is the expected output

  • Give user nancy permissions to read, write from topic test.

    cd /home/ec2-user/kafka
    export dn="User:${cn}"
    bin/ --bootstrap-server $brokerssaslscram --add --allow-principal $dn --operation Read --topic test group=* --command-config /tmp/client.properties_alice
  • Grant create or delete topics, create or delete Topic ACLs to user alice.
    Note: Since by default, initially user alice should be able to connect and give herself the requisite permissions.

  • Grant read and write access to the test topic (but not to the testaclfail topic) to this client with the certificate installed in the keystore in the setup section.

    • You can run the following command to get the Distinguished-Name. This for information only. It’s included in the Read and Write access commands below.

      keytool --list -v -keystore /tmp/kafka.client.keystore.jks|grep ip-
    • You should get an output like below. The highlighted portion is the Distinguished-Name.

    • Run the following commands to give Read and Write access to the test topic.
      Note: When running the first command, you will be prompted for a password. Provide the password you entered for the keystore when running the AuthMSK-1.0-SNAPSHOT.jar file in the Setup section.

      cn=`keytool --list -v -keystore /tmp/kafka.client.keystore.jks|grep ip-|cut -d " " -f 2`
      export dn="User:${cn}"
      bin/ --authorizer-properties zookeeper.connect=$zoo --add --allow-principal $dn --operation Read --group=* --topic test
      bin/ --authorizer-properties zookeeper.connect=$zoo --add --allow-principal $dn --operation Write --topic test
    • You should get an output like below.