本教程介绍如何使用 Kafka Connect 的 Source Connector 将 SQL Server 的数据同步至消息队列 Kafka 版

前提条件

在开始本教程前,请确保您已完成以下操作:

  • 已下载 SQL Server Source Connector。
    说明 SQL Server Source Connector 的下载链接
  • 已下载 Kafka Connect。
    说明 SQL Server Source Connector 目前只支持 2.1.0 及以上版本的 Kafka Connect。
  • 已安装 Docker。

步骤一:配置 Kafka Connect

  1. 将下载完成的 SQL Server Connector 解压到指定目录。
  2. 在 Kafka Connect 的配置文件 connect-distributed.properties 中配置插件安装位置。
    ## 指定插件解压后的路径
    plugin.path=/kafka/connect/plugins
    注意

    Kafka Connect 的早期版本不支持配置 plugin.path,您需要在 CLASSPATH 中指定插件位置。

    export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*

步骤二:启动 Kafka Connect

配置好 connect-distributed.properties 后,执行以下命令启动 Kafka Connect。

## 如果是公网接入,需先设置 java.security.auth.login.config
## 如果是 VPC 接入,可以跳过这一步
> export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"

## 启动 Kafka Connect
> bin/connect-distributed.sh config/connect-distributed.properties

步骤三:安装 SQL Server

注意 SQL Server 2016 SP1 以上版本支持 CDC,因此您的 SQL Server 版本必须高于该版本。
  1. 下载 docker-compose-sqlserver.yaml
  2. 执行以下命令安装 SQL Server。
    docker-compose -f docker-compose-sqlserver.yaml up

步骤四:配置 SQL Server

  1. 下载 inventory.sql
  2. 执行以下命令初始化 SQL Server 中的测试数据。
    cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
  3. 可选: 如果您需要监听 SQL Server 中已有的数据表,请完成以下配置:
    1. 执行以下命令开启 CDC 配置。
      ## Enable Database for CDC template
      USE testDB
      GO
      EXEC sys.sp_cdc_enable_db
      GO
    2. 执行以下命令指定 Table 开启 CDC 配置。
      ## Enable a Table Specifying Filegroup Option Template
      USE testDB
      GO
      
      EXEC sys.sp_cdc_enable_table
      @source_schema = N'dbo',
      @source_name   = N'MyTable',
      @role_name     = N'MyRole',
      @filegroup_name = N'MyDB_CT',
      @supports_net_changes = 1
      GO
    3. 执行以下命令确认是否有权限访问 CDC Table。
      EXEC sys.sp_cdc_help_change_data_capture
      GO
      说明 如果返回结果为空,您需要确认是否有权限访问该表。
    4. 执行以下命令确认 SQL Server Agent 已开启。
      EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
      说明 如果返回结果为 Running,则说明 SQL Server Agent 已开启。

步骤五:启动 SQL Server Connector

  1. 下载 register-sqlserver.json
  2. 编辑 register-sqlserver.json
    • VPC 接入
      ## Kafka 接入点,通过控制台获取
      ## 您在控制台获取的默认接入点
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台创建同名topic,在本例中创建topic:server1
      ## 所有table的变更数据,会记录在server1.$DATABASE.$TABLE的topic中,如server1.testDB.products
      ## 因此用户需要提前在控制台中创建所有相关topic
      "database.server.name": "server1",
      ## 记录schema变化信息将记录在这个topic中
      ## 需要提前在控制台创建
      "database.history.kafka.topic": "schema-changes-inventory"
    • 公网接入
      ## Kafka接入点,通过控制台获取。存储db中schema变化信息
      ## 您在控制台获取的SSL接入点
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台创建同名topic,在本例中创建topic:server1
      ## 所有table的变更数据,会记录在server1.$DATABASE.$TABLE的topic中,如server1.testDB.products
      ## 因此用户需要提前在控制台中创建所有相关topic
      "database.server.name": "server1",
      ## 记录schema变化信息将记录在这个topic中
      ## 需要提前在控制台创建
      "database.history.kafka.topic": "schema-changes-inventory",
      ## SSL公网方式访问配置
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. 完成 register-sqlserver.json 配置后,您需要根据配置在控制台创建相应的 Topic,相关操作步骤请参见步骤一:创建 Topic
    说明
    按照本教程中的方式安装的 SQL Server,您可以看到 SQL Server 中已经提前创建 db name:testDB。其中有四张表:
    • customers
    • orders
    • products
    • products_on_hand
    根据以上 register-sqlserver.json的配置,您需要使用 OpenAPI 创建 Topic:
    • server1
    • server1.testDB.customers
    • server1.testDB.orders
    • server1.testDB.products
    • server1.testDB.products_on_hand
    register-sqlserver.json 中,配置了将 schema 变化信息记录在 schema-changes-testDB,因此您还需要使用 OpenAPI 创建 Topic:schema-changes-inventorysch。
    说明 使用 OpenAPI 创建 Topic,请参见 CreateTopic
  4. 执行以下命令启动 SQL Server。
    > curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

结果验证

确认消息队列 Kafka 版能否接收到 SQL Server 的变更数据。

  1. 变更监听 SQL Server 中的数据。
  2. 在控制台的消息查询页面,查询变更消息。