Overview

In this lab we illustrate how the integration between AWS Lambda and Amazon MSK works using AWS Glue Schema Registry for registering schemas and Avro-encoding messages. In order to do that, we walk through a specific use case - How to propagate or backup messages from a topic in Amazon MSK to Amazon S3 using Kinesis Data Firehose. The code is in GitHub.

The architecture will look like the following:


Here we have a topic (ExampleTopic) in Amazon MSK, to which we send Avro encoded messages from an Apache Kafka producer that generates mock clickstream data (If you want to learn more about the producer, see Producer. If you want to clone the producer code, see GitHub). The producer uses the AWS Glue Schema Registry to serialize messages in Avro. Although parameters can be specified for the producer to use a specific registry and pre-created schema, in this lab, the producer uses the default schema registry (named default-registry) with auto registration of schemas. These events need to be backed up or stored in Amazon S3 for long term storage. So, we create a Lambda function in Java. We then create a Lambda trigger or EventSourceMapping connecting the Lambda function to a stream source, in this case the specified Amazon MSK cluster (using the Amazon MSK cluster Arn provided) and point it to the ExampleTopic topic. With this mapping in place, the Lambda service starts polling the ExampleTopic topic for messages and based on the Batchsize configuration (default 100, in the lab set to 1000), retrieves messages from one or more partitions in the ExampleTopic topic and invokes the mapped Lambda function synchronously with the payload after Base64 encoding.

When the Lambda trigger is created for Amazon MSK, the Lambda service uses the specified Amazon MSK cluster Arn to retrieve the Amazon MSK cluster information and uses the security group associated with the Amazon MSK cluster to retrieve events. Therefore, in order to be able to connect to the Amazon MSK cluster, the security group attached to the Amazon MSK cluster needs to have a self-referencing inbound rule allowing TCP traffic on port 9092 and port 9094.

Since the Lambda function is invoked synchronously, the service waits for the processing in the Lambda function to complete before it polls for the next batch of records. In addition, if the Lambda function returns an error (if all exceptions are not handled in the Lambda code) or there is an error invoking the Lambda function, the Lambda service blocks and keeps invoking the Lambda function with the same payload to preserve order in processing records from Amazon MSK. This also means that it is important to make sure the code executes efficiently or the processing might start lagging behind the produced records. In the sample code provided here, the Lambda function sends records to Amazon Kinesis Data Firehose asynchronously and uses a Callback to orchestrate retries, thus not blocking the service from moving forward in getting subsequent batches of records from Amazon MSK. However, this does not guarantee ordered processing of records. The Lambda service also scales the number of concurrently running Lambda functions processing records based on some internal metrics, which requires no intervention on the customer side.

The format of the events is as follows:

{
	"records": {
		"ExampleTopic-1": [{
				"topic": "ExampleTopic",
				"partition": 1,
				"offset": 99033,
				"timestamp": 1598453846552,
				"timestampType": "CREATE_TIME",
				"key": "AAAAAAEOMjk1MzI1NA\u003d\u003d",
				"value": "AAAAAAIUNjYuMjQ5LjEuNbDIsLSFXRBjb21wdXRlcgAKb3JkZXIAFGNkIHBsYXllcnPMwOgC4IMO3oMO"
			},
			{
				"topic": "ExampleTopic",
				"partition": 1,
				"offset": 99034,
				"timestamp": 1598453846552,
				"timestampType": "CREATE_TIME",
				"key": "AAAAAAEOMjk1MzI1NA\u003d\u003d",
				"value": "AAAAAAIUNjYuMjQ5LjEuNbDIsLSFXRBjb21wdXRlcgAecHJvZHVjdF9jYXRhbG9nAAZOL0HMwOgC4oMO4IMO"
			}
		],
		"ExampleTopic-2": [{
			"topic": "ExampleTopic",
			"partition": 2,
			"offset": 100035,
			"timestamp": 1598453848622,
			"timestampType": "CREATE_TIME",
			"key": "AAAAAAEOMzAwNTA3MA\u003d\u003d",
			"value": "AAAAAAIWNjYuMjQ5LjEuNjbc6LC0hV0MdGFibGV0AB5wcm9kdWN0X2NhdGFsb2cABk4vQZzq7gKkpg6ipg4\u003d"
		}]
	},
	"eventSource": "aws:kafka",
	"eventSourceARN": "arn:aws:kafka:u...",
	"awsRegion": "us-east-1"
}

The events are provided as an array in the (KafkaEvent) object, organized by Topic-Partition. If ordered processing of events is a requirement, the events need to be processed by Topic-Partiton.

The code loops through all the messages, Base64 decodes the key and value and then deserializes the Avro encoded value by using the AWS Glue Schema Registry used by the producer. As mentioned, in this lab, since the producer uses the default registry, the Lambda Function also uses the default registry with the AWSKafkaAvroDeserializer library to deserialize the messages. One important thing to call out here is the use of the secondary deserializer with the AWSKafkaAvroDeserializer. The secondary deserializer allows the KafkaAvroDeserializer that integrates with the AWS Glue Schema Registry to use a specified secondary deserializer that points to a 3rd party Schema Registry. This enables the AWSKafkaAvroDeserializer to deserialize records that were not serialized using the AWS Glue Schema Registry. This is primarily useful when migrating from a 3rd party Schema Registry to the AWS Glue Schema Registry. With the secondary deserializer specified, the Lambda function can seamlessly deserialize records using both a 3rd party Schema Registry and the AWS Glue Schema Registry. In order to use it, the properties specific to the 3rd party deserializer need to be specified as well.

Here is a snippet of the Java code that utilizes the AWS Glue Schema Registry and AWSKafkaAvroDeserializer to deserialize the messages. The entire code is in GitHub

private byte[] base64Decode(KafkaEvent.KafkaEventRecord kafkaEventRecord) {
    return Base64.getDecoder().decode(kafkaEventRecord.getValue().getBytes());
}

private Map<String, Object> getGSRConfigs() {
    Map<String, Object> gsrConfigs = new HashMap<>();
    gsrConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    gsrConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
    gsrConfigs.put(AWSSchemaRegistryConstants.AWS_REGION, System.getenv("AWS_REGION"));
    gsrConfigs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
    if (System.getenv("SECONDARY_DESERIALIZER") != null) {
        if (Boolean.parseBoolean(System.getenv("SECONDARY_DESERIALIZER"))) {
            gsrConfigs.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
            gsrConfigs.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getenv("SCHEMA_REGISTRY_URL"));
            gsrConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        }
    }
    return gsrConfigs;
}

private void deserializeAddToFirehoseBatch(Deserializer deserializer, KafkaEvent kafkaEvent, String requestId, SendKinesisDataFirehose sendKinesisDataFirehose) {
    kafkaEvent.getRecords().forEach((key, value) -> value.forEach(v -> {
    ClickEvent clickEvent = (ClickEvent) deserializer.deserialize(v.getTopic(), base64Decode(v));
    sendKinesisDataFirehose.addFirehoseRecordToBatch(clickEvent.toString(), requestId);
    }));
}

void processRecords(KafkaEvent kafkaEvent, String requestId) {
	SendKinesisDataFirehose sendKinesisDataFirehose = new SendKinesisDataFirehose();
	Deserializer deserializer = new AWSKafkaAvroDeserializer(getGSRConfigs());
	deserializeAddToFirehoseBatch(deserializer, kafkaEvent, requestId, sendKinesisDataFirehose);
}

We create batches (not shown in the code snippet) of 500 records (since the BatchSize for the Lambda trigger is set to 1000, there will be 2 batches per invocation or Request. However, it is possible that the number of records received in the payload is less than 1000, depending on the size of each record and other constraints like batching window, with a max payload size of 6MB) and send the batches asynchronously to Amazon Kinesis Data Firehose. Kinesis Data Firehose has a delivery stream setup to deliver the records to Amazon S3 after buffering for 120 seconds or 5 MB whichever comes earlier.