通过Kafka导入数据

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

云数据库 SelectDB 版支持通过Doris Kafka连接器(Doris Kafka Connector)自动订阅和同步Kafka中的数据。本文介绍Doris Kafka Connector同步数据到云数据库 SelectDB 版的方法。

背景信息

Kafka Connect是一款在Apache Kafka和其他系统之间进行可靠数据传输的工具,可通过定义Connectors来将大量数据迁入或者迁出Kafka。

Doris社区提供的Kafka连接器,运行在Kafka Connect集群中,支持从Kafka Topic中读取数据,并将数据写入云数据库 SelectDB 版中。

在业务场景中,用户通常会通过Debezium Connector将数据库的变更数据推送至Kafka,或者调用API将JSON格式数据实时写入Kafka。Doris Kafka Connector会自动订阅Kafka中的数据,并将这些数据同步到云数据库 SelectDB 版中。

Kafka Connect的运行模式

Kafka Connect有两种种运行模式,您可以根据具体需求选择合适的模式。

Standalone模式

警告

不建议在生产环境中使用Standalone模式。

配置Standalone

配置connect-standalone.properties。

# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092

在Kafka config目录下创建connect-selectdb-sink.properties,并配置如下内容:

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

启动Standalone

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed模式

配置Distributed

配置connect-distributed.properties。

# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
 
# 修改 group.id,同一集群的需要一致
group.id=connect-cluster

启动Distributed

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

增加Connector

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "topics":"topic_test",
    "doris.topic2table.map": "topic_test:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
    "doris.user":"admin",
    "doris.password":"***",
    "doris.database":"test_db",
    "doris.http.port":"8080",
    "doris.query.port":"9030",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}'

配置项

参数

说明

name

Connector的名称。一般命名为不包含ISO控制符的字符串,必须在Kafka Connect环境中唯一。

connector.class

Connector类的名称或者别名。固定为org.apache.doris.kafka.connector.DorisSinkConnector

topics

数据源Topic。不同Topic之间以英文逗号(,)进行分隔。

doris.topic2table.map

Topic和Table表的对应关系,多个对应关系之间以英文逗号(,)进行分隔例:topic1:tb1,topic2:tb2。默认为空,表示Topic和Table名称相同。

buffer.count.records

在flush到云数据库 SelectDB 版之前,每个Kafka分区在内存中缓冲的记录数。默认10000条记录。

buffer.flush.time

内存中缓冲的刷新间隔,单位为秒。默认为120秒。

buffer.size.bytes

每个Kafka分区在内存中缓冲的记录的累积大小,单位为字节。默认为5000000。

doris.urls

云数据库 SelectDB 版连接地址。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取相关参数。

示例:selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

doris.http.port

云数据库 SelectDB 版的 HTTP协议端口,默认为8080。

doris.query.port

云数据库 SelectDB 版MySQL协议端口,默认为9030。

doris.user

云数据库 SelectDB 版用户名。

doris.password

云数据库 SelectDB 版密码。

doris.database

写入数据的云数据库 SelectDB 版的数据库。

key.converter

指定Key的JSON转换器类。

value.converter

指定Value的JSON转换器类。

jmx

通过 JMX 获取 Connector 内部监控指标。具体操作,请参见Doris-Connector-JMX。默认为TRUE。

enable.delete

是否同步删除记录, 默认 false。

label.prefix

通过Stream Load导入数据时的 label 前缀。默认为Connector应用名称。

auto.redirect

是否重定向Stream Load请求。开启后StreamLoad将通过FE重定向到需要写入数据的 BE,并且不再显示获取 BE 信息。

load.model

导入数据的方式。当前支持以下两种方式:

  • stream_load方式直接将数据导入到 SelectDB 中;

  • copy_into方式导入数据至对象存储,然后将数据加载至 SelectDB中。

默认为stream_load

sink.properties.*

Stream Load 的导入参数。

例如: 定义列分隔符'sink.properties.column_separator':','

更多信息,请参见Stream Load

delivery.guarantee

消费 Kafka 数据导入到云数据库 SelectDB 版时,数据一致性的保障方式。 支持at_least_onceexactly_once,默认为at_least_once

当前 云数据库 SelectDB 版 只能保障使用 copy into 导入的数据exactly_once

enable.2pc

是否启用两阶段提交,以确保exact-once语义。

说明

其他 Kafka Connect Sink 通用配置项,详情请参见connect_configuring

使用示例

环境准备

  1. 安装Apache Kafka集群或Confluent Cloud,版本不低于2.4.0,本示例以Kafka单机环境为基本环境。

    #下载并解压 
    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
    bin/kafka-server-start.sh -daemon config/server.properties
  2. 下载doris-kafka-connector-1.0.0.jar,并将JAR包放到KAKFA_HOME/libs目录下。

  3. 创建云数据库 SelectDB 版实例,详情请参见创建实例

  4. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

  5. 创建测试数据库和测试表。

    1. 创建测试数据库。

      CREATE DATABASE test_db;
    2. 创建测试表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

示例一:快速同步JSON数据

  1. 配置SelectDB Sink

    以Standalone模式为例,在Kafka config目录下创建selectdb-sink.properties,并配置如下内容:

    name=selectdb_sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test_topic
    doris.topic2table.map=test_topic:example_tbl
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=***
    doris.database=test_db
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #配置死信队列,可选
    errors.tolerance=all
    errors.deadletterqueue.topic.name=test_error
    errors.deadletterqueue.context.headers.enable = true
    errors.deadletterqueue.topic.replication.factor=1
  2. 启动Kafka Connect

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

示例二:使用Debezium同步MySQL数据到云数据库 SelectDB 版

很多业务场景经常需要从业务数据库中实时同步数据,这个时候就需要使用数据库的变更数据捕获CDC(Change Data Capture)机制。

Debezium是基于Kafka Connect的CDC工具,可以对接MySQL、PostgreSQL、SQL Server、Oracle、MongoDB等多种数据库,把数据库的数据持续以统一的格式发送到Kafka的Topic中,以供下游Sink端进行实时消费,本文以MySQL为例。

  1. 下载Debezium。

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  2. 解压下载文件。

    tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  3. 将解压后的所有JAR包放置到KAKFA_HOME/libs目录下。

  4. 配置MySQL Source。

    在Kafka config目录下创建mysql-source.properties,并配置如下内容:

    name=mysql-source
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
    database.port=3306
    database.user=testuser
    database.password=****
    database.server.id=1
    # kafka中的该client的唯一标识
    database.server.name=test123
    # 需要同步的数据库和表,默认是同步所有数据库和表
    database.include.list=test
    table.include.list=test.test_table
    database.history.kafka.bootstrap.servers=localhost:9092
    # 用于存储数据库表结构变化的 Kafka topic
    database.history.kafka.topic=dbhistory
    transforms=unwrap
    # 参考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    # 记录删除事件
    transforms.unwrap.delete.handling.mode=rewrite

    配置好之后,Kafka中默认的Topic名称格式是SERVER_NAME.DATABASE_NAME.TABLE_NAME

    说明

    Debezium配置请参见Debezium connector for MySQL

  5. 配置云数据库 SelectDB 版 Sink。

    在Kafka config目录下创建selectdb-sink.properties,配置以下内容:

    name=selectdb-sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test123.test.test_table
    doris.topic2table.map=test123.test.test_table:test_table
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=****
    doris.database=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #配置死信队列,可选
    #errors.tolerance=all
    #errors.deadletterqueue.topic.name=test_error
    #errors.deadletterqueue.context.headers.enable = true
    #errors.deadletterqueue.topic.replication.factor=1
    说明

    同步到云数据库 SelectDB 版时,需要提前创建好数据库和表。

  6. 启动Kafka Connect。

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
    说明

    启动后,可以观察日志logs/connect.log是否启动成功。

使用进阶

Connector操作

# 查看connector状态
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 删除当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 暂停当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 重启当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# 重启connector内的tasks
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

详细信息请参见:Connect REST Interface

死信队列

默认情况下,转换过程中或转换过程中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过它们来容忍此类错误,可选择将每个错误和失败操作的详细信息以及有问题的记录(具有不同级别的详细信息)写入死信队列以便记录。

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

详细信息请参见:connect_errorreporting

访问SSL认证的Kafka集群

通过kafka-connect访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(client.truststore.jks)。您可以在connect-distributed.properties文件中增加以下配置:

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
 
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

关于通过kafka-connect连接SSL认证的Kafka集群配置说明可以参考:Configure Kafka Connect