Create the S3 Sink Connector

The Confluent.io Amazon S3 Connector, exports data from Kafka cluster to Amazon S3. It guarantees exactly-once delivery semantics. It is available only as a sink connector. It supports creating S3 objects in JSON or Avro formats.

Sink connectors poll data from Kafka periodically and uploads the data to Amazon S3 objects. Support of partitioning will help segment your data in Amazon S3, by a given single or a combination of properties.

  • Connect to KafkaClientEC2Instance instance via Session Manager. Open a terminal and paste the command you copied from the following section, to upload the connector to the Amazon S3 plugins bucket:
sudo -u ec2-user -i
mkdir kafka-connect-s3 && cd kafka-connect-s3
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.0.3/confluentinc-kafka-connect-s3-10.0.3.zip
aws s3 cp ./confluentinc-kafka-connect-s3-10.0.3.zip s3://msk-lab-${ACCOUNT_ID}-plugins-bucket/kafka-connect-s3/
cd ..

  • Navigate to Amazon MSK Create Custom Plugins page in the console.

  • Click Create custom plugin Button.

  • Browse msk-lab-{ACCOUNT_ID}-plugins-bucket/kafka-connect-s3/ bucket and select confluentinc-kafka-connect-s3-10.0.3.zip object, and click on Choose button.

choose s3 sink plugin from the plugin s3 bucket
  • Enter confluentinc-kafka-connect-s3-10-0-3 in the “Custom plugin name” textbox and provide an optional description.

  • Click on Create custom plugin button. Wait a few seconds until the new Custom Plugin is created and becomes Active.

Amazon s3 sink plugin from the plugin s3 bucket created
  • Continue, by clicking on Create connector button

  • confluentinc-kafka-connect-s3-10-0-3 is selected by default from the list, click on Next

  • Enter confluentinc-kafka-connect-s3 in the Connector name textbox, and provide an optional description

  • Choose “MSK Cluster” as Cluster Type and the MSKCluster-msk-connect-lab from the list of available Amazon MSK clusters

  • Choose IAM as the Authentication Mode from the Dropdown

  • Copy the configuration block below, and paste in the Connector Configuration text area:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=<YOUR AWS REGION>
flush.size=200
schema.compatibility=NONE
tasks.max=1
topics=salesdb.salesdb.CUSTOMER
value.converter.schemas.enable=false
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=msk-lab-<ACCOUNT_ID>-target-bucket
  • Replace <YOUR AWS REGION> with the region you have launched the stack

  • Replace <ACCOUNT_ID> with your AWS Account ID

enter s3 sink connector configuration
  • Leave Connector capacity settings to its defaults

  • In Worker configuration section, choose Use a custom configuration option. Choose SourceAuroraDebeziumConnectorConfig from the dropdown. This ensures we use the same key / value converter formats for the consumers

  • In the Access Permissions section, Choose the IAM role that begins with msk-connect-lab-S3ConnectorIAMRole-* from the available options in the dropdown

choose S3 s3 sink IAM role
  • Click Next

  • Leave Security settings to its default

  • Check Deliver to Amazon Cloudwatch Logs from the Log delivery options

  • Browse, find and select /msk-lab-target-s3sink-connector from the list

  • Review settings, and click Create Connector

  • Wait until the status changes to Running from Creating