本教程介绍如何使用Kafka Connect的Source Connector将SQL Server的数据同步至消息队列Kafka版。
前提条件
在开始本教程前,请确保您已完成以下操作:
步骤一:配置Kafka Connect
- 将下载完成的SQL Server Connector解压到指定目录。
- 在Kafka Connect的配置文件connect-distributed.properties中配置插件安装位置。
## 指定插件解压后的路径。
plugin.path=/kafka/connect/plugins
注意
Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。
export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*
步骤二:启动Kafka Connect
配置好connect-distributed.properties后,执行以下命令启动Kafka Connect。
## 如果是公网接入,需先设置java.security.auth.login.config。
## 如果是VPC接入,可以跳过这一步。
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
## 启动Kafka Connect。
bin/connect-distributed.sh config/connect-distributed.properties
## 启动Kafka Connect。
bin/connect-distributed.sh config/connect-distributed.properties
步骤四:配置SQL Server
- 下载inventory.sql。
- 执行以下命令初始化SQL Server中的测试数据。
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
- 可选:如果您需要监听SQL Server中已有的数据表,请完成以下配置:
- 执行以下命令开启CDC配置。
## 开启CDC模板数据库。
USE testDB
GO
EXEC sys.sp_cdc_enable_db
GO
- 执行以下命令开启指定Table的CDC配置。
## 开启指定Table的CDC配置。
USE testDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GO
- 执行以下命令确认是否有权限访问CDC Table。
EXEC sys.sp_cdc_help_change_data_capture
GO
说明 如果返回结果为空,您需要确认是否有权限访问该表。
- 执行以下命令确认SQL Server Agent已开启。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
说明 如果返回结果为Running
,则说明SQL Server Agent已开启。
步骤五:启动SQL Server Connector
- 下载register-sqlserver.json。
- 编辑register-sqlserver.json。
- VPC接入
## 消息队列Kafka版实例的默认接入点,您可以在消息队列Kafka版控制台获取。
"database.history.kafka.bootstrap.servers" : "kafka:9092",
## 您需要提前在消息队列Kafka版控制台创建同名Topic,在本例中创建topic:server1。
## 所有table的变更数据,会记录在server1.$DATABASE.$TABLE的topic中,例如server1.testDB.products。
## 因此您需要提前在消息队列Kafka版控制台中创建所有相关Topic。
"database.server.name": "server1",
## 记录schema变化信息将记录在该Topic中。
## 您需要提前在消息队列Kafka版控制台创建该Topic。
"database.history.kafka.topic": "schema-changes-inventory"
- 公网接入
## 消息队列Kafka版实例的SSL接入点,您可以在消息队列Kafka版控制台获取。
"database.history.kafka.bootstrap.servers" : "kafka:9092",
## 您需要提前在消息队列Kafka版控制台创建同名Topic,在本例中创建topic:server1。
## 所有table的变更数据,会记录在server1.$DATABASE.$TABLE的Topic中,例如server1.testDB.products。
## 因此您需要提前在消息队列Kafka版控制台中创建所有相关Topic。
"database.server.name": "server1",
## 记录schema变化信息将记录在该Topic中。
## 您需要提前在消息队列Kafka版控制台创建该Topic。
"database.history.kafka.topic": "schema-changes-inventory",
## 通过SSL接入点访问,还需要修改以下配置。
"database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
"database.history.producer.ssl.truststore.password": "KafkaOnsClient",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
"database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
- 完成register-sqlserver.json配置后,您需要根据配置在控制台创建相应的Topic,相关操作步骤请参见步骤一:创建Topic。
按照本教程中的方式安装的SQL Server,您可以看到SQL Server中已经提前创建
db name:testDB。其中有四张表:
- customers
- orders
- products
- products_on_hand
根据以上
register-sqlserver.json的配置,您需要使用OpenAPI创建Topic:
- server1
- server1.testDB.customers
- server1.testDB.orders
- server1.testDB.products
- server1.testDB.products_on_hand
在register-sqlserver.json中,配置了将schema变化信息记录在schema-changes-testDB,因此您还需要使用OpenAPI创建Topic:schema-changes-inventory,相关操作请参见CreateTopic。
- 执行以下命令启动SQL Server。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
结果验证
确认消息队列Kafka版能否接收到SQL Server的变更数据:
- 变更监听SQL Server中的数据。
- 在控制台的消息查询页面,查询变更消息。具体操作步骤,请参见查询消息。
在文档使用中是否遇到以下问题
更多建议
匿名提交