Source Connector Setup

Next, you create a custom plugin for MSK Connect. A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the connect cluster where the connector is running.

Run the below commands to download the MySQL connector plugin for the latest stable release. Because MSK Connect accepts custom plugins in ZIP or JAR format, we convert the downloaded archive to ZIP format and keep the JARs files in the main directory:

sudo -u ec2-user -i
mkdir debezium && cd debezium
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.0.Final/debezium-connector-mysql-1.7.0.Final-plugin.tar.gz
tar xzf debezium-connector-mysql-1.7.0.Final-plugin.tar.gz
cd debezium-connector-mysql
zip -9 ../debezium-connector-mysql-1.7.0.Final-plugin.zip *
cd ..
aws s3 cp ./debezium-connector-mysql-1.7.0.Final-plugin.zip s3://msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium/
  • On the Amazon MSK console there is a MSK Connect section. choose Create connector, then create a custom plugin and browse the msk-lab-${ACCOUNT_ID}-plugins-bucket S3 bucket, select the custom plugin ZIP file you just uploaded.

choose debezium plugin from the s3 bucket

  • Enter debezium-connector-mysql-1-7-0-Final-plugin for the plugin name. Optionally, enter a description and click on Create Custom Plugin.

create custom plugin and click create

After a few seconds you should see the plugin is created and the status is Active.

Shows when debezium plugin is created

Now that the configuration of the custom plugin is complete, we start the creation of the connector. But before we create a connector, we should create a custom connector configuration.

MSK Connect provides the default worker configuration: org.apache.kafka.connect.storage.StringConverter as a value converter. Since we intend to publish debezium objects in Json format, we have to change the default behavior. Also each value object will include the schema by default, that increases the verbosity of the objects. In this Lab we work with the schema-less objects. Hence, we specify that in a custom configuration:

  • Select the Worker configurations from the MSK section.

  • Click on the Create worker configuration button.

  • Enter SourceAuroraDebeziumConnector in Worker Configuration Name textbox. The Description is optional.

  • Copy the following configuration parameters, and paste them in the Worker configuration textbox:

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Create custom msk connect worker configuration

  • Click on Create worker configuration

Now we are ready to create our source connector.

  • From the MSK section choose Connectors, then click Create connector.

  • Choose debezium-connector-mysql-1-7-0-Final-plugin from the list of Custom Plugins, Click Next.

Choose Debezium from the list of existing plugins

  • Enter source-debezium-mysql-salesdb-all-tables in the Name textbox, and a description of your choice for the connector.

Enter name, description for your connector

  • Select the MSKCluster-msk-connect-lab from the listed MSK clusters. From the Authentication Dropdown choose IAM

Choose MSK Cluster and IAM from the dropdown

  • Copy the following configuration below, and paste in the connector configuration textbox.
connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=master
database.server.id=123456
tasks.max=1
database.history.kafka.topic=dbhistory.salesdb
database.history.kafka.bootstrap.servers=<MSK Bootstrap Server Address>
database.server.name=salesdb
database.port=3306
include.schema.changes=true
database.hostname=<Aurora RDS MySQL Endpoint>
database.password=S3cretPwd99
database.include.list=salesdb
value.converter.schemas.enable=false
key.converter.schemas.enable=false
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  • Replace the <MSK Bootstrap Server Address>, and <Aurora RDS MySQL Endpoint> with the addresses you pasted in your notepad application.
replace the bootstrap, aurora db addresses from your notepad application

The total capacity of a connector depends on the number of workers that the connector has, as well as on the number of MSK Connect Units (MCUs) per worker. Each MCU represents 1 vCPU of compute and 4 GiB of memory. To create a connector, you must choose between one of the following two capacity modes: Provisioned, and Auto scaled (default). See Connector capacity for more information.

  • For this lab, leave the default Connector capacity settings.

  • Select Use a custom configuration and choose SourceAuroraDebeziumConnector from the Worker Configuration Dropdown.

choose custom worker configuration for source connector

  • From the Identity and Access Management (IAM) role Dropdown, choose the option starts with msk-connect-lab-AuroraConnectorIAMRole-*.

choose IAM role for source connector

  • Click the Next button and leave all default Security settings.

  • Click Next

  • Check the Deliver to Amazon CloudWatch Logs checkbox.

  • Click on Browse button, search and select /msk-lab-source-aurora-connector log group. Click Choose button.

Search and choose aurora connector cw group

  • Click Next

  • Review all configurations, and click Create connector.

It may take up to 15 minutes for the connector to create and the status changes to Running

Source connector is running