本教程介绍如何使用 Kafka Connect 的 Source Connector 将 MySQL 的数据同步至消息队列 Kafka 版

背景信息

Kafka Connect 主要用于将数据流输入和输出消息队列 Kafka 版。Kafka Connect 主要通过各种 Source Connector的实现,将数据从第三方系统输入到 Kafka broker,通过各种 Sink Connector 实现,将数据从 Kafka broker 中导入到第三方系统。system

前提条件

在开始本教程前,请确保您已完成以下操作:

  • 已下载 MySQL Source Connector。
    说明 本教程以 0.5.2 版本的 MySQL Source Connector 为例。
  • 已下载 Kafka Connect。
    说明 本教程以 0.10.2.2 版本的 Kafka Connect 为例。
  • 已安装 docker。

步骤一:配置 Kafka Connect

  1. 将下载完成的 MySQL Connector 解压到指定目录。
  2. 在 Kafka Connect 的配置文件 connect-distributed.properties 中配置插件安装位置。
    plugin.path=/kafka/connect/plugins
    注意

    Kafka Connect 的早期版本不支持配置 plugin.path,您需要在 CLASSPATH 中指定插件位置。

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

步骤二:启动 Kafka Connect

在配置好 connect-distributed.properties 后,执行以下命令启动 Kafka Connect。

  • 公网接入
    1. 执行命令export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"设置 java.security.auth.login.config
    2. 执行命令 bin/connect-distributed.sh config/connect-distributed.properties启动 Kafka Connect。
  • VPC 接入

    执行命令bin/connect-distributed.sh config/connect-distributed.properties启动 Kafka Connect。

步骤三:安装 MySQL

  1. 下载 docker-compose-mysql.yaml
  2. 执行以下命令安装 MySQL。
    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

步骤四:配置 MySQL

  1. 执行以下命令开启 MySQL 的 binlog 写入功能,并配置 binlog 模式为 row。
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. 执行以下命令设置 MySQL 的 User 权限。
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    说明 示例中 MySQL 的 User 为 debezium,密码为 dbz

步骤五:启动 MySQL Connector

  1. 下载 register-mysql.json
  2. 编辑 register-mysql.json
    • VPC 接入
      ## Kafka 接入点,通过控制台获取
      ## 您在控制台获取的默认接入点
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台创建同名 Topic,在本例中创建 topic:server1
      ## 所有 Table 的变更数据,会记录在 server1.$DATABASE.$TABLE 的 Topic 中,如 server1.inventory.products
      ## 因此用户需要提前在控制台中创建所有相关 Topic
      "database.server.name": "server1",
      ## 记录 schema 变化信息将记录在这个 Topic 中
      ## 需要提前在控制台创建
      "database.history.kafka.topic": "schema-changes-inventory"
    • 公网接入
      ## Kafka 接入点,通过控制台获取。存储 db 中 schema 变化信息
      ## 您在控制台获取的 SSL 接入点
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台创建同名topic,在本例中创建 Topic:server1
      ## 所有 Table 的变更数据,会记录在 server1.$DATABASE.$TABLE 的 Topic 中,如 server1.testDB.products
      ## 因此用户需要提前在控制台中创建所有相关 Topic
      "database.server.name": "server1",
      ## schema 变化信息将记录在这个 Topic 中
      ## 需要提前在控制台创建
      "database.history.kafka.topic": "schema-changes-inventory",
      ## SSL 公网方式访问配置
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. 配置好 register-mysql.json 后,您需要根据配置在控制台创建相应的 Topic,相关操作步骤请参见步骤一:创建 Topic
    说明
    按照本教程中的方式安装的 MySQL,您可以看到 MySQL 中已经提前创建好了 database:inventory。其中有四张表:
    • customers
    • orders
    • products
    • products_on_hand
    根据以上配置,您需要使用 OpenAPI 创建 Topic:
    • server1
    • server1.inventory.customers
    • server1.inventory.orders
    • server1.inventory.products
    • server1.inventory.products_on_hand
    register-mysql.json 中,配置了将 schema 变化信息记录在 schema-changes-testDB,因此您还需要使用 OpenAPI 创建 Topic:schema-changes-inventory
    说明 使用 OpenAPI 创建 Topic,请参见 CreateTopic
  4. 执行以下命令开 MySQL Connector。
    > curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

结果验证

按照以下步骤操作确认消息队列 Kafka 版能否接收到 MySQL 的变更数据。

  1. 变更 MySQL Table 中的数据。
  2. 在控制台的消息查询页面,查询变更数据。