本教程介绍如何使用Kafka Connect的Source Connector将SQL Server的数据同步至云消息队列 Kafka 版。
前提条件
在开始本教程前,请确保您已完成以下操作:
- 已下载SQL Server Source Connector。具体信息,请参见SQL Server Source Connector。
- 已下载Kafka Connect。具体信息,请参见Kafka Connect。 说明 SQL Server Source Connector目前只支持2.1.0及以上版本的Kafka Connect。
- 已下载Docker。具体信息,请参见Docker。
步骤一:配置Kafka Connect
- 将下载完成的SQL Server Connector解压到指定目录。
- 在Kafka Connect的配置文件connect-distributed.properties中配置插件安装位置。
## 指定插件解压后的路径。 plugin.path=/kafka/connect/plugins
重要Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。
export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*
步骤二:启动Kafka Connect
配置好connect-distributed.properties后,执行以下命令启动Kafka Connect。
- 如果是公网接入,需先设置java.security.auth.login.config,如果是VPC接入,可以跳过这一步。
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
- 启动Kafka Connect。
bin/connect-distributed.sh config/connect-distributed.properties
步骤三:安装SQL Server
重要 SQL Server 2016 SP1以上版本支持CDC,因此您的SQL Server版本必须高于该版本。
- 下载docker-compose-sqlserver.yaml。
- 执行以下命令安装SQL Server。
docker-compose -f docker-compose-sqlserver.yaml up
步骤四:配置SQL Server
- 下载inventory.sql。
- 执行以下命令初始化SQL Server中的测试数据。
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
- 可选:如果您需要监听SQL Server中已有的数据表,请完成以下配置:
步骤五:启动SQL Server Connector
- 下载register-sqlserver.json。
- 编辑register-sqlserver.json。
- VPC接入
## 云消息队列 Kafka 版实例的默认接入点,您可以在云消息队列 Kafka 版控制台获取。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 您需要提前在云消息队列 Kafka 版控制台创建同名Topic,在本例中创建topic:server1。 ## 所有table的变更数据,会记录在server1.$DATABASE.$TABLE的topic中,例如server1.testDB.products。 ## 因此您需要提前在云消息队列 Kafka 版控制台中创建所有相关Topic。 "database.server.name": "server1", ## 记录schema变化信息将记录在该Topic中。 ## 您需要提前在云消息队列 Kafka 版控制台创建该Topic。 "database.history.kafka.topic": "schema-changes-inventory"
- 公网接入
## 云消息队列 Kafka 版实例的SSL接入点,您可以在云消息队列 Kafka 版控制台获取。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 您需要提前在云消息队列 Kafka 版控制台创建同名Topic,在本例中创建topic:server1。 ## 所有table的变更数据,会记录在server1.$DATABASE.$TABLE的Topic中,例如server1.testDB.products。 ## 因此您需要提前在云消息队列 Kafka 版控制台中创建所有相关Topic。 "database.server.name": "server1", ## 记录schema变化信息将记录在该Topic中。 ## 您需要提前在云消息队列 Kafka 版控制台创建该Topic。 "database.history.kafka.topic": "schema-changes-inventory", ## 通过SSL接入点访问,还需要修改以下配置。 "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN",
- VPC接入
- 完成register-sqlserver.json配置后,您需要根据配置在控制台创建相应的Topic,相关操作步骤请参见步骤一:创建Topic。按照本教程中的方式安装的SQL Server,您可以看到SQL Server中已经提前创建db name:testDB。其中有四张表:
- customers
- orders
- products
- products_on_hand
- server1
- server1.testDB.customers
- server1.testDB.orders
- server1.testDB.products
- server1.testDB.products_on_hand
在register-sqlserver.json中,配置了将schema变化信息记录在schema-changes-testDB,因此您还需要使用OpenAPI创建Topic:schema-changes-inventory,相关操作请参见CreateTopic。
- 执行以下命令启动SQL Server。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
结果验证
确认云消息队列 Kafka 版能否接收到SQL Server的变更数据:
- 变更监听SQL Server中的数据。
- 在控制台的消息查询页面,查询变更消息。具体操作步骤,请参见查询消息。