Lab Setup

Steps we will perform in this section

Regions

Please pick a region from the following when you run the CloudFormation template:

# Region Name Code
1 US East (N. Virginia) us-east-1
2 US West (Oregon) us-west-2
3 EU (Ireland) eu-west-1

Run the CloudFormation template to create the VPC, the Cloud9 Bastion environment and the Apache Kafka Client EC2 instances

  • Make sure you have created an EC2 KeyPair as shown in the Prerequisites section.
    Note: Create a pem file irrespective of using MAC or Windows.

  • Right click on Launch Stack and open it in a new tab to execute the CloudFormation template. You can download the CloudFormation template here.




  • Choose the EC2 KeyPair that you created in the Prerequisites step.

  • Click Next.

  • Click Next on the next page.

  • Scroll down, check the checkboxes next to I acknowledge that AWS CloudFormation might create IAM resources with custom names and I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND in the Capabilities section and click on Create stack.


    The stack creates:

    1. A VPC with 1 Public subnet and 3 Private subnets and the required plumbing including a NAT Gateway.
    2. A Cloud9 environment that can be used as a jump box.
    3. 2 Apache Kafka client EC2 instances.

Setup SSH keys in the Cloud9 environments

  • Go to the AWS Cloud9 console.

  • Click on MSKClient-Cloud9EC2Bastion and then click Open IDE.


  • In the Getting started section, click on Upload Files…


  • Click on Select files. Pick the EC2 pem file that you created in the Prerequisites section. Click Open. The file will be copied to the /home/ec2-user/environment dir and will also be visible in the left pane.


  • Go to the bash pane at the bottom and type in the following commands to setup the ssh environment so that you can access the Kafka Client EC2 instances.

    chmod 600 <pem file>
    eval `ssh-agent`
    ssh-add -k <pem file>
    
  • Right click in the space between the bash pane and the menu bar and click on Split pane in Two Columns.


  • In the new pane created on the right, click on the small green circle with a + and click on New Terminal.


  • You now have two terminals. The terminal on the left will be used to connect to Kafka EC2 Instance 1 and the source Amazon MSK Cluster and the terminal on the right will be used to connect to Kafka EC2 Instance 2 and the destination Amazon MSK Cluster.


  • Go to the bash terminal on the right and type the following commands.

    eval `ssh-agent`
    ssh-add -k <pem file>
    

Setup AWS Certificate Manager (ACM) Private Certificate Authority (PCA)

  • Create the ACM PCA. Execute the following commands at the bash prompt in the Cloud9 environment.

    aws acm-pca create-certificate-authority --certificate-authority-configuration '{"KeyAlgorithm":"RSA_2048","SigningAlgorithm":"SHA256WITHRSA","Subject":{"Country":"US","Organization":"Amazon","OrganizationalUnit":"AWS","State":"New York","CommonName":"MyMSKPCA","Locality":"New York City"}}' --revocation-configuration '{"CrlConfiguration":{"Enabled":false}}' --certificate-authority-type "ROOT" --idempotency-token 12345
    
  • The expected output is

  • Copy the ARN (Amazon Resource Name) of the PCA you just created to a notepad application.

  • Install a self signed certificate in the ACM PCA just created. A certificate needs to be installed in the ACM PCA for the PCA to be able to issue and sign end-entity certificates.

    • Go to the AWS ACM Console.

    • Click on Private CAs in the left pane or click on Get started if doing it the first time.

    • Select the PCA you just created. It should display a message saying Before issuing certificates with your CA, you need to import a CA certificate.


    • Click on the link Install a CA certificate to activate your CA. Accept the defaults and click on Next.
      Note: The default validity is 10 years. You can change the validity here but do not set it to a very low value.


    • Click on Confirm and install


    • Go to the Permissions tab. We will authorize ACM to renew certificates issued by this CA. Click on Edit.
      Note: ACM can renew the certificates issued by the PCA before they expire but the PCA has to authorize ACM to be able to do so. If allowed, ACM can renew the certificates and the clients can re-download the issued certificates to their keystores before they expire to continue to successfully authenticate.


    • Click on Authorize ACM to use this CA for renewals. Click on Save.




Run the CloudFormation template to create the MSK clusters

  • Right click on the following link and open it in a new tab to execute the CloudFormation template. You can download the CloudFormation template here.
    Warning: Ensure you create the CloudFormation stack in the same region as the ACM PCA that you created earlier.




  • Click Next.

  • For BastionStack, specify the name of the Cloud9 Bastion CloudFormation stack that you created earlier.

  • For MSKDestinationKafkaVersion, choose 2.4.1.1.

  • For MSKSourceKafkaVersion, choose 2.3.1.

  • For PCAARNDestinationMSKCluster and PCAARNSourceMSKCluster, Specify the ARN of the PCA that you created and copied earlier. Here we will be using the same PCA for both but you could specify different ones.

  • For TLSMutualAuthenticationDestinationMSKCluster and TLSMutualAuthenticationSourceMSKCluster select true.

  • For VPCStack, Go to the CloudFormation console, Click on the CloudFormation stack that you created earlier (default MSKClient), go to the Outputs tab and copy the Value for the key VPCStackName.


  • Click Next

  • Click Next on the next page.

  • Scroll down and click on Create stack.

    It could take up to 15 minutes for the stack to run. Once the status of the stack changes to CREATE_COMPLETE, the stack is done creating. Please wait for the stack to complete and then proceed further.

    The stack creates:

    1. 2 Amazon MSK clusters that allow both TLS and PLAINTEXT client connections to the Amazon MSK Apache Kafka clusters and TLS Mutual authentication. We will use the Amazon MSK cluster version 2.3.1 as the source cluster and the Amazon MSK cluster version 2.4.1.1 as the destination cluster.

Get the cluster information for the Amazon MSK clusters

  • Go to the Amazon MSK console. Click on the first Amazon MSK cluster that was created by CloudFormation.

  • Click on View client information on the top right side of the page under Cluster summary.


  • Click on the Copy icon under Bootstrap servers for both TLS and Plaintext and paste it in a notepad application. Make sure you identify Apache Kafka version 2.3.1 as the source cluster and Apache Kafka version 2.4.1.1 as the destination cluster.

  • Click on the Copy icon under Zookeeper connect and paste it in a notepad application. Click on Done.

  • Go back to the Amazon MSK console. Click on the second Amazon MSK cluster that was created by CloudFormation.

  • Click on View client information on the top right side of the page under Cluster summary.

  • As before, click on the Copy icon under Bootstrap servers for both TLS and Plaintext and paste it in a notepad application. Make sure you identify Apache Kafka version 2.3.1 as the source cluster and Apache Kafka version 2.4.1.1 as the destination cluster.

  • Click on the Copy icon under Zookeeper connect and paste it in a notepad application. Click on Done.


Setup Keystore and Truststore in the Apache Kafka client EC2 instance 1

  • Go to the Cloudformation console.

  • Click on the stack you created in the previous section for the MSK Cluster. Go to the Outputs tab and copy the Value next to the key SSHKafkaClientEC2Instance1 and paste it in a notepad application.

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE.


  • Go to the bash pane at the bottom and ssh to the KafkaClientEC2Instance1 created by the Cloud9 Bastion CloudFormation stack. Paste the copied ssh command for SSHKafkaClientEC2Instance1.
    Note: If you get a message saying Are you sure you want to continue connecting (yes/no)?, type yes.

    ssh -A ec2-user@<internal-dns-name>
    
  • Enter the following commands to setup the Amazon MSK environment variables.

    cd /tmp/kafka
    python3 setup-env.py --stackName <CloudFormation stack name of MSK clusters> --region <AWS Region of the CloudFormation stack>
    . ./setup_env
    
  • Go to the /tmp/kafka dir and run the AuthMSK-1.0-SNAPSHOT.jar jar file. The sample code is available at github.

    Parameters:

    • –help (or -h): help to get list of parameters
    • -caa (or –certificateAuthorityArn) (mandatory): The Arn of the Private Certificate Authority in ACM to issue the end-client certificates. Use the ARN of the PCA that you copied in the Setup section.
    • -ksp (or –keystorePassword) (mandatory): The keystore password.
    • -ksa (or –alias)(Default msk): The alias of the key entry in the keystore.
    • -pem (or –createPEMFiles): Optional flag to create PEM files for the Private Key and the issued client certificate to be used by clients in python, node.js etc.

    Note: The default region for the ACM PCA is us-east-1. If you created the ACM PCA in a different region, add -reg <region-name> to the command below.

    cd /tmp/kafka
    java -jar AuthMSK-1.0-SNAPSHOT.jar -caa <arn of the ACM PCA that you copied before> -ksp <specify a keystore password> -ksa <specify an alias> -pem
    

    This will do the following:

    • generate a Private Key.
    • create a Java Keystore with the password provided for Keystore.
    • store the Private Key in the keystore with the password provided for Keystore.
    • convert the Private Key to PEM and store it in a pem file at /tmp/private_key.pem.
    • generate a CSR (Certificate Signing Request) for the Private Key.
    • Connect to the ACM PCA provided and get a certifciate issued from the CSR.
    • Connect to the ACM PCA provided and get the issued certificate.
    • Store the issued certificate in the Java Keystore.
    • Convert the issued certificate to PEM and store it in a pem file at /tmp/client_cert.pem.

Setup Keystore and Truststore in the Apache Kafka client EC2 instance 2

  • Go to the Cloudformation console.

  • Click on the stack you created for the MSK Cluster. Go to the Outputs tab and copy the Value next to the key SSHKafkaClientEC2Instance2 and paste it in a notepad application.

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if alreay open).


  • Go to the terminal on the right and ssh to the KafkaClientEC2Instance2 created by the Cloud9 Bastion CloudFormation stack. Paste the copied ssh command for SSHKafkaClientEC2Instance2.
    Note: If you get a message saying Are you sure you want to continue connecting (yes/no)?, type yes.

    ssh -A ec2-user@<internal-dns-name>
    
  • Enter the following commands to setup the Amazon MSK environment variables.

    cd /tmp/kafka
    python3 setup-env.py --stackName <CloudFormation stack name of MSK clusters> --region <AWS Region of the CloudFormation stack>
    . ./setup_env
    
  • Go to the /tmp/kafka dir and run the AuthMSK-1.0-SNAPSHOT.jar jar file as before. Parameters:

    • –help (or -h): help to get list of parameters
    • -caa (or –certificateAuthorityArn) (mandatory): The Arn of the Private Certificate Authority in ACM to issue the end-client certificates. Use the ARN of the PCA that you copied in the Setup section.
    • -ksp (or –keystorePassword) (mandatory): The keystore password.
    • -ksa (or –alias)(Default msk): The alias of the key entry in the keystore.
    • -pem (or –createPEMFiles): Optional flag to create PEM files for the Private Key and the issued client certificate to be used by clients in python, node.js etc.

    Note: The default region for the ACM PCA in the AuthMSK-1.0-SNAPSHOT.jar application is us-east-1. If you created the ACM PCA in a different region, add -reg <region-name> to the command below.

    cd /tmp/kafka
    java -jar AuthMSK-1.0-SNAPSHOT.jar -caa <arn of the ACM PCA that you copied before> -ksp <specify a keystore password> -ksa <specify an alias> -pem
    

Setup a Schema Registry cluster

Kafka Client Instance2

For this lab we will be using a Java Apache Kafka producer and consumer. The producer will be sending Avro encoded messages to Amazon MSK and correspondingly the consumer will be reading Avro encoded messages. They need a Schema Registry for storing and retrieving Avro schemas. We will be setting up a Schema Registry cluster.

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if alreay open).

  • Go to the terminal on the right (you should be in KafkaClientInstance2). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka directory and edit the schema-registry.properties file to make sure it includes the following.
    Note: To get the hostname of the EC2 instance, run the command hostname on the linux prompt.
    To get the bootstrap servers, run the command echo $brokerstlsmskdest. We’re pointing to the destination Amazon MSK cluster as that is the cluster we will be migrating to.

    host.name=<hostname of the EC2 instance. This is important to identify the schema registry instance in a cluster>
    kafkastore.bootstrap.servers=<BootstrapBroker-String(TLS)>
    kafkastore.security.protocol=SSL (make sure it is uncommented - remove the "#")
    kafkastore.ssl.truststore.location=/tmp/kafka.client.truststore.jks (make sure it is uncommented - remove the "#")
    kafkastore.ssl.keystore.location=/tmp/kafka.client.keystore.jks (make sure it is uncommented - remove the "#")
    kafkastore.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file> (make sure it is uncommented - remove the "#")
    kafkastore.ssl.key.password=<password that you used for the keystore when running the authorization jar file> (make sure it is uncommented - remove the "#")
    kafkastore.ssl.truststore.password=<truststore password, default = changeit>(make sure it is uncommented - remove the "#")
    
  • Start the Schema Registry service. It will use the ssl parameters from the /tmp/schema-registry.properties file and connect to the Amazon MSK cluster using TLS mutual authentication.

    sudo systemctl start confluent-schema-registry
    sudo systemctl status confluent-schema-registry
    
    • This is the expected output from running these commands.


    • Check to make sure the Schema Registry started successfully

      cat /tmp/kafka/schema-registry.log|grep "Server started"
      
    • This is the expected output from running this command.


Kafka Client Instance 1

  • Go to the terminal on the left (you should be in KafkaClientInstance1). The linux prompt should include the instance name. Check to make sure you’re in the correct instance.

  • Go to the /tmp/kafka directory and edit the schema-registry.properties file to make sure it includes the following.
    Note: To get the hostname of the EC2 instance, run the command hostname on the linux prompt.
    To get the bootstrap servers, run the command echo $brokerstlsmskdest. We’re pointing to the destination Amazon MSK cluster as that is the cluster we will be migrating to.

    host.name=<hostname of the EC2 instance. This is important to identify the schema registry instance in a cluster>
    kafkastore.bootstrap.servers=<BootstrapBroker-String(TLS)>
    kafkastore.security.protocol=SSL (make sure it is uncommented - remove the "#")
    kafkastore.ssl.truststore.location=/tmp/kafka.client.truststore.jks (make sure it is uncommented - remove the "#")
    kafkastore.ssl.keystore.location=/tmp/kafka.client.keystore.jks (make sure it is uncommented - remove the "#")
    kafkastore.ssl.keystore.password=<password that you used for the keystore when running the authorization jar file> (make sure it is uncommented - remove the "#")
    kafkastore.ssl.key.password=<password that you used for the keystore when running the authorization jar file> (make sure it is uncommented - remove the "#")
    kafkastore.ssl.truststore.password=<truststore password, default = changeit>(make sure it is uncommented - remove the "#")
    
  • Start the Schema Registry service. It will use the ssl parameters from the /tmp/schema-registry.properties file and connect to the Amazon MSK cluster using TLS mutual authentication.

    sudo systemctl start confluent-schema-registry
    sudo systemctl status confluent-schema-registry
    
    • This is the expected output from running these commands.


    • Check to make sure the Schema Registry started successfully

      cat /tmp/kafka/schema-registry.log|grep "Server started"
      
    • This is the expected output from running this command.


You now have a Schema Registry cluster running.