Overview

In this lab we illustrate how the integration between AWS Lambda and Amazon MSK works. In order to do that, we walk through a specific use case - How to propagate or backup messages from a topic in Amazon MSK to Amazon S3 using Kinesis Data Firehose. The code is in GitHub.

The architecture will look like the following:


Here we have a topic (ExampleTopic) in Amazon MSK, to which we send Avro encoded messages from an Apache Kafka producer that generates mock clickstream data (If you want to learn more about the producer, see Producer. If you want to clone the producer code, see GitHub). These events need to be backed up or stored in Amazon S3 for long term storage. So, we create a Lambda function in Java. We then create a Lambda trigger or EventSourceMapping connecting the Lambda function to a stream source, in this case the specified Amazon MSK cluster (using the Amazon MSK cluster Arn provided) and point it to the ExampleTopic topic. With this mapping in place, the Lambda service starts polling the ExampleTopic topic for messages and based on the Batchsize configuration (default 100, in the lab set to 1000), retrieves messages from one or more partitions in the ExampleTopic topic and invokes the mapped Lambda function synchronously with the payload after Base64 encoding.

When the Lambda trigger is created for Amazon MSK, the Lambda service uses the specified Amazon MSK cluster Arn to retrieve the Amazon MSK cluster information and uses the security group associated with the Amazon MSK cluster to retrieve events. Therefore, in order to be able to connect to the Amazon MSK cluster, the security group attached to the Amazon MSK cluster needs to have a self-referencing inbound rule allowing TCP traffic on port 9092 and port 9094.

Since the Lambda function is invoked synchronously, the service waits for the processing in the Lambda function to complete before it polls for the next batch of records. In addition, if the Lambda function returns an error (if all exceptions are not handled in the Lambda code) or there is an error invoking the Lambda function, the Lambda service blocks and keeps invoking the Lambda function with the same payload to preserve order in processing records from Amazon MSK. This also means that it is important to make sure the code executes efficiently or the processing might start lagging behind the produced records. In the sample code provided here, the Lambda function sends records to Amazon Kinesis Data Firehose asynchronously and uses a Callback to orchestrate retries, thus not blocking the service from moving forward in getting subsequent batches of records from Amazon MSK. However, this does not guarantee ordered processing of records. The Lambda service also scales the number of concurrently running Lambda functions processing records based on some internal metrics, which requires no intervention on the customer side.

The format of the events is as follows:

{
	"records": {
		"ExampleTopic-1": [{
				"topic": "ExampleTopic",
				"partition": 1,
				"offset": 99033,
				"timestamp": 1598453846552,
				"timestampType": "CREATE_TIME",
				"key": "AAAAAAEOMjk1MzI1NA\u003d\u003d",
				"value": "AAAAAAIUNjYuMjQ5LjEuNbDIsLSFXRBjb21wdXRlcgAKb3JkZXIAFGNkIHBsYXllcnPMwOgC4IMO3oMO"
			},
			{
				"topic": "ExampleTopic",
				"partition": 1,
				"offset": 99034,
				"timestamp": 1598453846552,
				"timestampType": "CREATE_TIME",
				"key": "AAAAAAEOMjk1MzI1NA\u003d\u003d",
				"value": "AAAAAAIUNjYuMjQ5LjEuNbDIsLSFXRBjb21wdXRlcgAecHJvZHVjdF9jYXRhbG9nAAZOL0HMwOgC4oMO4IMO"
			}
		],
		"ExampleTopic-2": [{
			"topic": "ExampleTopic",
			"partition": 2,
			"offset": 100035,
			"timestamp": 1598453848622,
			"timestampType": "CREATE_TIME",
			"key": "AAAAAAEOMzAwNTA3MA\u003d\u003d",
			"value": "AAAAAAIWNjYuMjQ5LjEuNjbc6LC0hV0MdGFibGV0AB5wcm9kdWN0X2NhdGFsb2cABk4vQZzq7gKkpg6ipg4\u003d"
		}]
	},
	"eventSource": "aws:kafka",
	"eventSourceARN": "arn:aws:kafka:u...",
	"awsRegion": "us-east-1"
}

The events are provided as an array in the (KafkaEvent) object, organized by Topic-Partition. If ordered processing of events is a requirement, the events need to be processed by Topic-Partiton.

The code loops through all the messages, Base64 decodes the key and value and then deserializes the Avro encoded value by using the Schema Registry used by the producer. Since the Schema Registry is running on an EC2 instance in the customer VPC, the Lambda function needs access to the VPC to be able to access the Schema Registry. So, the VPC configuration for the Lambda function needs to provide connectivity to the VPC. In the lab, we have provided the same private subnets as the Amazon MSK cluster. Further, the security group associated with the Lambda function in the VPC config needs to be provided access to the EC2 instance hosting Schema Registry.

Here is a snippet of the Java code that utilizes the Schema Registry and KafkaAvroDeserializer to deserialize the messages. The entire code is in GitHub

class ProcessRecords {
    private SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(System.getenv("SCHEMA_REGISTRY_URL"), 10);
    private KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);

    private byte[] base64Decode(KafkaEvent.KafkaEventRecord kafkaEventRecord) {
        return Base64.getDecoder().decode(kafkaEventRecord.getValue().getBytes());
    }

    void processRecords(KafkaEvent kafkaEvent, String requestId) {

        kafkaEvent.getRecords().forEach((key, value) -> value.forEach(v -> {
            GenericRecord rec = (GenericRecord) kafkaAvroDeserializer.deserialize(v.getTopic(), base64Decode(v));
            ClickEvent clickEvent = (ClickEvent) SpecificData.get().deepCopy(ClickEvent.SCHEMA$, rec);
        }));
        
    }

We create batches (not shown in the code snippet) of 500 records (since the BatchSize for the Lambda trigger is set to 1000, there will be 2 batches per invocation or Request. However, it is possible that the number of records received in the payload is less than 1000, depending on the size of each record and other constraints like batching window, with a max payload size of 6MB) and send the batches asynchronously to Amazon Kinesis Data Firehose. Kinesis Data Firehose has a delivery stream setup to deliver the records to S3 after buffering for 120 seconds or 5 MB whichever comes earlier.