Verify the replication in the Kafka cluster

Verify the topics are auto-created in the Kafka Cluster

  • Connect to the KafkaClientEC2Instance instance via Session Manager. Open a terminal and paste the command you copied from the following section.
sudo -u ec2-user -i
mkdir iam-auth && cd ./iam-auth
wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar
cd ../

cat <<EOF > /home/ec2-user/kafka/config/client-config.properties
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF

export CLASSPATH=/home/ec2-user/iam-auth/aws-msk-iam-auth-1.1.0-all.jar
echo "export CLASSPATH=${CLASSPATH}" | tee -a ~/.bash_profile

/home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server $MSK_BOOTSTRAP_ADDRESS --command-config /home/ec2-user/kafka/config/client-config.properties --list

You should see several topics created as shown in the following. Debezium performs snapshots and creates topics per each replicating table:

List all topics results in terminal window

Verify the database changes are published to the Kafkfa topics

  • Open three terminal connections to the instance.

  • In the first terminal connection, start a Kafka consumer for a topic with the same name as the database server (salesdb). This topic is used by Debezium to stream schema changes (for example, when a new table is created).

/home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server $MSK_BOOTSTRAP_ADDRESS --consumer.config /home/ec2-user/kafka/config/client-config.properties --from-beginning --topic salesdb | jq --color-output .

You should see Json objects per each schema object, in the salesdb database similar to the following:

{
  "source": {
    "version": "1.7.0.Final",
    "connector": "mysql",
    "name": "salesdb",
    "ts_ms": 1634482082070,
    "snapshot": "true",
    "db": "salesdb",
    "sequence": null,
    "table": "SUPPLIER",
    "server_id": 0,
    "gtid": null,
    "file": "mysql-bin-changelog.000003",
    "pos": 43273534,
    "row": 0,
    "thread": null,
    "query": null
  },
  "databaseName": "salesdb",
  "schemaName": null,
  "ddl": "CREATE TABLE `SUPPLIER` (\n  `SUPPLIER_ID` int(11) NOT NULL DEFAULT '0',\n  `NAME` varchar(25) NOT NULL,\n  `ADDRESS` varchar(25) NOT NULL,\n  `CITY` varchar(10) NOT NULL,\n  `STATE` varchar(12) NOT NULL,\n  `COUNTRY` varchar(12) NOT NULL,\n  `PHONE` varchar(15) NOT NULL,\n  PRIMARY KEY (`SUPPLIER_ID`)\n) ENGINE=InnoDB DEFAULT CHARSET=latin1",
  "tableChanges": [
    {
... }
  • In the second terminal connection, start another Kafka consumer for a topic with a name built by concatenating the database server salesdb, the database salesdb, and the table customer. This topic is used by Debezium to stream data changes for the table (for example, when a new record is inserted):
/home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server $MSK_BOOTSTRAP_ADDRESS --consumer.config /home/ec2-user/kafka/config/client-config.properties --from-beginning --topic salesdb.salesdb.CUSTOMER | jq --color-output .

You should see Json objects per each row in the CUSTOMER table in salesdb database, similar to the following:

{
  "before": null,
  "after": {
    "CUST_ID": 2000,
    "NAME": "Customer Name 2000",
    "MKTSEGMENT": "Market Segment 7"
  },
  "source": {
    "version": "1.7.0.Final",
    "connector": "mysql",
    "name": "salesdb",
    "ts_ms": 1634482082974,
    "snapshot": "true",
    "db": "salesdb",
    "sequence": null,
    "table": "CUSTOMER",
    "server_id": 0,
    "gtid": null,
    "file": "mysql-bin-changelog.000003",
    "pos": 43273534,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "r",
  "ts_ms": 1634482082974,
  "transaction": null
}
  • In the third terminal connection, use MySQL client and connect to the Aurora database:
mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password=S3cretPwd99

  • From this connection, create the SALES_ORDER_VI table inside salesdb database:
USE salesdb;

CREATE TABLE SALES_ORDER_VI (
       order_id VARCHAR(255),
       customer_id VARCHAR(255),
       item_description VARCHAR(255),
       price DECIMAL(6,2),
       order_date DATETIME DEFAULT CURRENT_TIMESTAMP
);

These database changes are captured by the Debezium connector managed by MSK Connect and are streamed to the MSK cluster. In the first terminal, consuming the topic with schema changes, you see the information on the creation of database and table:

{
  "source": {
    "version": "1.7.0.Final",
    "connector": "mysql",
    "name": "salesdb",
    "ts_ms": 1634569586254,
    "snapshot": "false",
    "db": "salesdb",
    "sequence": null,
    "table": "SALES_ORDER_VI",
    "server_id": 1733046080,
    "gtid": null,
    "file": "mysql-bin-changelog.000003",
    "pos": 43275309,
    "row": 0,
    "thread": null,
    "query": null
  },
  "databaseName": "salesdb",
  "schemaName": null,
  "ddl": "CREATE TABLE SALES_ORDER_VI (\n       order_id VARCHAR(255),\n       customer_id VARCHAR(255),\n       item_description VARCHAR(255),\n       price DECIMAL(6,2),\n       order_date DATETIME DEFAULT CURRENT_TIMESTAMP\n)",
  "tableChanges": [
    {
      "type": "CREATE",
      "id": "\"salesdb\".\"SALES_ORDER_VI\"",
      "table": {
...}

  • Then, go back to the database connection in the third terminal to insert a few records in the orders table:
INSERT INTO CUSTOMER VALUES ('2001', 'Customer Name 2001', 'Market Segment 7');
INSERT INTO CUSTOMER VALUES ('2002', 'Customer Name 2002', 'Market Segment 1');
INSERT INTO CUSTOMER VALUES ('2003', 'Customer Name 2003', 'Market Segment 3');
INSERT INTO CUSTOMER VALUES ('2004', 'Customer Name 2004', 'Market Segment 5');

In the second terminal, view the information on the records inserted into the CUSTOMER table:

{
  "before": null,
  "after": {
    "CUST_ID": 2001,
    "NAME": "Customer Name 2001",
    "MKTSEGMENT": "Market Segment 7"
  },
  "source": {
    "version": "1.7.0.Final",
    "connector": "mysql",
    "name": "salesdb",
    "ts_ms": 1634569283000,
    "snapshot": "false",
    "db": "salesdb",
    "sequence": null,
    "table": "CUSTOMER",
    "server_id": 1733046080,
    "gtid": null,
    "file": "mysql-bin-changelog.000003",
    "pos": 43275145,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",
  "ts_ms": 1634569283193,
  "transaction": null
}
...