Setup Lambda

Steps we will perform in this section

  • Setup a Lambda function, a Kinesis Data Firehose delivery stream and a Lambda trigger or event source mapping between the Amazon MSK Apache Kafka topic and the Lambda function.

Use CloudFormation and SAM (Serverless Application Model) to deploy resources

  • Go back to the AWS Cloud9 console. Click on MSKClient-Cloud9EC2Bastion.

  • Click on Open IDE (or go back to the browser tab if already open).


  • You should be connected to the KafkaClientEC2Instance instance. If not, ssh to the KafkaClientEC2Instance instance by pasting the ssh command you copied in the last section.

  • Go to the /tmp/kafka directory and clone the integration-sample-lambda-msk git repository.
    Note: Take a look at the code in GitHub . To see a summary of what the code is doing see Overview.

    cd /tmp/kafka
    git clone https://github.com/aws-samples/integration-sample-lambda-msk.git
    cd integration-sample-lambda-msk
    
  • Run the deploy.sh script. The script does the following. (You can cat the script to look at its contents.)

    1. It builds a jar file for the Lambda function.
    2. It creates an Amazon S3 bucket to be used for uploaded artifacts with a random prefix in its name.
    3. It uses CloudFormation to build the Lambda function and package its resources.
    4. It deploys the sam template and creates a CloudFormation stack with multiple resources. The resources include:
      • An IAM policy to be used by the Lambda function.
      • An IAM role with the policy to be used by the Lambda function.
      • An IAM role to be used by Amazon Kinesis Data Firehose.
      • A Kinesis Data Firehose delivery stream.
      • The Lambda function which will process records from a topic in Amazon MSK and send to Kinesis Data Firehose.
      • An EventSourceMapping mapping the Lambda function to the Amazon MSK Apache Kafka topic.

    Note: Change the values of MSK_STACK and KAFKA_CLIENT_STACK if you changed them from the default values.
    Note: This Lambda function works with the AWS Glue Schema Registry as well. The CSR parameter indicates use of a 3rd party Schema Registry and is set to true here. The SECONDARY_DESERIALIZER parameter is not relevant when a 3rd party Schema Registry is used but is relevant when CSR is set to false and the AWS Glue Schema Registry is used. It allows the AWSKafkaAvroDeserializer that integrates with the AWS Glue Schema Registry to use a specified secondary deserializer that points to a 3rd party Schema Registry. This enables the AWSKafkaAvroDeserializer to deserialize records that were not serialized using the AWS Glue Schema Registry. This is primarily useful when migrating from a 3rd party Schema Registry to the AWS Glue Schema Registry. If the SECONDARY_DESERIALIZER parameter is set to true, the Lambda function can seamlessly deserialize records using both a 3rd party Schema Registry and the AWS Glue Schema Registry. However, in this lab we will specifically use a 3rd party schema registry.

    export MSK_STACK=MSK
    export KAFKA_CLIENT_STACK=MSKClient
    export VPC_STACK=$(aws cloudformation describe-stacks --stack-name $MSK_STACK --query 'Stacks[0].Outputs[?OutputKey==`VPCStackName`].OutputValue' --output text)
    export SECONDARY_DESERIALIZER=false
    export CSR=true
    ./deploy.sh $VPC_STACK $KAFKA_CLIENT_STACK $MSK_STACK $SECONDARY_DESERIALIZER $CSR
    
  • At this point, everything is setup and the Lambda function is already processing records from Amazon MSK. We will look at the environment in the next section.