本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
云数据库 SelectDB 版支持通过Doris Kafka连接器(Doris Kafka Connector)自动订阅和同步Kafka中的数据。本文介绍Doris Kafka Connector同步数据到云数据库 SelectDB 版的方法。
背景信息
Kafka Connect是一款在Apache Kafka和其他系统之间进行可靠数据传输的工具,可通过定义Connectors来将大量数据迁入或者迁出Kafka。
Doris社区提供的Kafka连接器,运行在Kafka Connect集群中,支持从Kafka Topic中读取数据,并将数据写入云数据库 SelectDB 版中。
在业务场景中,用户通常会通过Debezium Connector将数据库的变更数据推送至Kafka,或者调用API将JSON格式数据实时写入Kafka。Doris Kafka Connector会自动订阅Kafka中的数据,并将这些数据同步到云数据库 SelectDB 版中。
Kafka Connect的运行模式
Kafka Connect有两种种运行模式,您可以根据具体需求选择合适的模式。
Standalone模式
不建议在生产环境中使用Standalone模式。
配置Standalone
配置connect-standalone.properties。
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
在Kafka config目录下创建connect-selectdb-sink.properties,并配置如下内容:
name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
启动Standalone
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties
Distributed模式
配置Distributed
配置connect-distributed.properties。
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
# 修改 group.id,同一集群的需要一致
group.id=connect-cluster
启动Distributed
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
增加Connector
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
"doris.user":"admin",
"doris.password":"***",
"doris.database":"test_db",
"doris.http.port":"8080",
"doris.query.port":"9030",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
配置项
参数 | 说明 |
name | Connector的名称。一般命名为不包含ISO控制符的字符串,必须在Kafka Connect环境中唯一。 |
connector.class | Connector类的名称或者别名。固定为 |
topics | 数据源Topic。不同Topic之间以英文逗号(,)进行分隔。 |
doris.topic2table.map | Topic和Table表的对应关系,多个对应关系之间以英文逗号(,)进行分隔例: |
buffer.count.records | 在flush到云数据库 SelectDB 版之前,每个Kafka分区在内存中缓冲的记录数。默认10000条记录。 |
buffer.flush.time | 内存中缓冲的刷新间隔,单位为秒。默认为120秒。 |
buffer.size.bytes | 每个Kafka分区在内存中缓冲的记录的累积大小,单位为字节。默认为5000000。 |
doris.urls | 云数据库 SelectDB 版连接地址。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取相关参数。 示例:selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
doris.http.port | 云数据库 SelectDB 版的 HTTP协议端口,默认为8080。 |
doris.query.port | 云数据库 SelectDB 版MySQL协议端口,默认为9030。 |
doris.user | 云数据库 SelectDB 版用户名。 |
doris.password | 云数据库 SelectDB 版密码。 |
doris.database | 写入数据的云数据库 SelectDB 版的数据库。 |
key.converter | 指定Key的JSON转换器类。 |
value.converter | 指定Value的JSON转换器类。 |
jmx | 通过 JMX 获取 Connector 内部监控指标。具体操作,请参见Doris-Connector-JMX。默认为TRUE。 |
enable.delete | 是否同步删除记录, 默认 false。 |
label.prefix | 通过Stream Load导入数据时的 label 前缀。默认为Connector应用名称。 |
auto.redirect | 是否重定向Stream Load请求。开启后StreamLoad将通过FE重定向到需要写入数据的 BE,并且不再显示获取 BE 信息。 |
load.model | 导入数据的方式。当前支持以下两种方式:
默认为 |
sink.properties.* | Stream Load 的导入参数。 例如: 定义列分隔符 更多信息,请参见Stream Load。 |
delivery.guarantee | 消费 Kafka 数据导入到云数据库 SelectDB 版时,数据一致性的保障方式。 支持 当前 云数据库 SelectDB 版 只能保障使用 copy into 导入的数据 |
enable.2pc | 是否启用两阶段提交,以确保exact-once语义。 |
其他 Kafka Connect Sink 通用配置项,详情请参见connect_configuring。
使用示例
环境准备
安装Apache Kafka集群或Confluent Cloud,版本不低于2.4.0,本示例以Kafka单机环境为基本环境。
#下载并解压 wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties
下载doris-kafka-connector-1.0.0.jar,并将JAR包放到KAKFA_HOME/libs目录下。
创建云数据库 SelectDB 版实例,详情请参见创建实例。
通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例。
创建测试数据库和测试表。
创建测试数据库。
CREATE DATABASE test_db;
创建测试表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
示例一:快速同步JSON数据
配置SelectDB Sink
以Standalone模式为例,在Kafka config目录下创建selectdb-sink.properties,并配置如下内容:
name=selectdb_sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test_topic doris.topic2table.map=test_topic:example_tbl buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=*** doris.database=test_db key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter #配置死信队列,可选 errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1
启动Kafka Connect
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties
示例二:使用Debezium同步MySQL数据到云数据库 SelectDB 版
很多业务场景经常需要从业务数据库中实时同步数据,这个时候就需要使用数据库的变更数据捕获CDC(Change Data Capture)机制。
Debezium是基于Kafka Connect的CDC工具,可以对接MySQL、PostgreSQL、SQL Server、Oracle、MongoDB等多种数据库,把数据库的数据持续以统一的格式发送到Kafka的Topic中,以供下游Sink端进行实时消费,本文以MySQL为例。
下载Debezium。
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
解压下载文件。
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
将解压后的所有JAR包放置到KAKFA_HOME/libs目录下。
配置MySQL Source。
在Kafka config目录下创建mysql-source.properties,并配置如下内容:
name=mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com database.port=3306 database.user=testuser database.password=**** database.server.id=1 # kafka中的该client的唯一标识 database.server.name=test123 # 需要同步的数据库和表,默认是同步所有数据库和表 database.include.list=test table.include.list=test.test_table database.history.kafka.bootstrap.servers=localhost:9092 # 用于存储数据库表结构变化的 Kafka topic database.history.kafka.topic=dbhistory transforms=unwrap # 参考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # 记录删除事件 transforms.unwrap.delete.handling.mode=rewrite
配置好之后,Kafka中默认的Topic名称格式是
SERVER_NAME.DATABASE_NAME.TABLE_NAME
。说明Debezium配置请参见Debezium connector for MySQL。
配置云数据库 SelectDB 版 Sink。
在Kafka config目录下创建selectdb-sink.properties,配置以下内容:
name=selectdb-sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test123.test.test_table doris.topic2table.map=test123.test.test_table:test_table buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=**** doris.database=test key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter #配置死信队列,可选 #errors.tolerance=all #errors.deadletterqueue.topic.name=test_error #errors.deadletterqueue.context.headers.enable = true #errors.deadletterqueue.topic.replication.factor=1
说明同步到云数据库 SelectDB 版时,需要提前创建好数据库和表。
启动Kafka Connect。
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
说明启动后,可以观察日志logs/connect.log是否启动成功。
使用进阶
Connector操作
# 查看connector状态
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 删除当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 暂停当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 重启当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# 重启connector内的tasks
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST
详细信息请参见:Connect REST Interface。
死信队列
默认情况下,转换过程中或转换过程中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过它们来容忍此类错误,可选择将每个错误和失败操作的详细信息以及有问题的记录(具有不同级别的详细信息)写入死信队列以便记录。
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1
详细信息请参见:connect_errorreporting。
访问SSL认证的Kafka集群
通过kafka-connect访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(client.truststore.jks)。您可以在connect-distributed.properties
文件中增加以下配置:
# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234
关于通过kafka-connect连接SSL认证的Kafka集群配置说明可以参考:Configure Kafka Connect。