AWS Lambda processing

In this section we will look at how the Lambda function is steup and how records are processed and sent to Amazon Kinesis Data Firehose.

Lambda Function

  • Go to the AWS Lambda console.

  • Click on the function just created. (It should have a name MSKToS3-ProcessMSKfunction-<random string>).

  • Scroll down to look at how it is setup. It should have 5 Enviroment variables defined.

    • DELIVERY_STREAM_NAME: This the Kinesis Data Firehose delivery stream the events read from Amazon MSK Apache Kafka topics will be sent to.
    • RETRIES: This is the number of times the code in the Lambda function attemp to send events to Kinesis Data Firehose in case a retriable error occurs.
    • SCHEMA_REGISTRY_URL: The events sent to the Amazon MSK Apache Kafka topic is Avro encoded and uses a Schema Registry installed on an EC2 instance in the same VPC as Amazon MSK. The Lambda function needs access to the Schema Registry to be able to decode the Avro encoded events before it sends them to Kinesis Data Firehose.
    • CSR: Indicates whether to use a 3rd party Schema Registry pointed to by the SCHEMA_REGISTRY_URL environment variable. By default, it uses the AWS Glue Schema Registry.
    • SECONDARY_DESERIALIZER: Relevant only when CSR is false and the Lambda function is using the AWS Glue Schema Registry to deserialize records. It allows the KafkaAvroDeserializer used with the AWS Glue Schema Registry to fall back to a 3rd party deserialzer to deserialize records when it encounters records that were not serialized using the AWS Glue Schema Registry. This is primarily useful when migrating from a 3rd party Schema Registry to the AWS Glue Schema Registry.

  • Next, look at the Tags. These tage are created by Cloudformation.

  • Next, look a the VPC settings. This Lambda function is attached to the VPC that has a Schema Registry installed and running on an EC2 instance in the same VPC as Amazon MSK. If the Lambda function did not need access to the Schema Registry, the Lambda function need not be attached to the VPC. Here it is attached to the same subnets as the Amazon MSK cluster and has the same security group attached as the EC2 instance running the Schema Registry.

  • Next, scroll up and look at the MSK trigger. This is an event source mapping that maps the Arn of the Amazon MSK cluster to the Lambda function. When this trigger is created, the Lambda service retrieves events from the specified topic in Amazon MSK and invokes this Lambda function with the event payload (KafkaEvent). The code in the Lambda function then acts on the payload. The Lambda service retrieves the payload using the ByteArrayDeserializer which means that the payload received by the Lambda function is in binary maintaining the format of the event as sent to Amazon MSK. The data record is also Base64 encoded. Since, in this case, it was Avro encoded, the code in the Lambda function needs to utilize the specified Schema Registry url to decode the Avro messages after Base64 decoding it.

  • Click on MSK.

  • Click on the arrow next to Details.

  • Notice that the Batch Size is set to 1000 by the CloudFormation template. (Default is 100). It also mentions the Topic name (ExampleTopic) and that the Last processing result is OK (It could take up to 15 minutes after the trigger is enabled for it to change from No Records processed to the actual processing result). You have the ability to Disable, Enable or Delete the trigger from the console but other changes need to be made using the AWS CLI or SDK.

  • Scroll up and click on the Monitoring tab. You can see a number of graphs. Take a look at the Error count and success rate (%) graph. Make sure that there are no errors in processing.

  • Click on View logs in CloudWatch. This will take you to the CloudWatch Logs console.

  • Click on Log stream. This will take you to the logs.

  • Look at a section of the records starting with START RequestId and ending with END RequestId:. You can see that the function logs the envrionment variables, context and the KafkaEvent it received. It also batches the 1000 records into two batches of 500 records each and sends it to Kinesis Data Firehose (the max batch size for the Kinesis Data Firehose PutRecordBatch api call is 500 records or 4 MB).

  • Scroll up and in the Filter events box, type in Successfully and press Enter. You can see all the seccessfully sent batches for each Request ID. You can also type in Retrying to see if the PutRecordBatch was throttled by Kinesis Data Firehose for any batches. If throttling occurs, you can see a number of Retrying records as the Lambda function tries to asynchronously retry sending the failed records to Kinesis Data Firehose for the specified (in the Lambda environment variable) number of retries.

Kinesis Data Firehose

  • Go to the Kinesis Data Firehose console.

  • Click on the delivery stream for this lab. It should have a name like MSKToS3-deliverystream-<random suffix>.

  • Click on the Monitoring tab. Look at each of the graphs there. In particluar, look at the Delivery to Amazon S3 success graph and make sure it is at or near 100%. Also, note the red line for the Incoming bytes per second, Incoming put requests per second and the Incoming records per second graphs. The red lines represent the values at which Kinesis Data Firehose will throttle records. If the actual values are below the red lines, no throttling should occur.

  • Scroll up and go back to the Details tab.

  • Scroll down and click on the S3 bucket under Amazon S3 destination. This will take you to the S3 bucket where Kinesis Data Firehose is sending the records to. You can see a number of folders (prefixes) underneath. See Amazon S3 Object Name Format for how the folders (prefixes) are organized. Click through until you get to the S3 objects. Then, click on any of the files.

  • Click on Object actions and then click on Query with S3 Select.

  • Select JSON for format and Document for JSON content type. Then, scroll down and click on Run SQL query.

  • Scroll down and you can see the actual records retrieved from the Amazon MSK ExampleTopic topic.

You have successfully completed this lab. Please Cleanup the Lab resources.