文档

通过Kafka导入数据

更新时间:
一键部署
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

云数据库 SelectDB 版支持通过Doris Kafka连接器(Doris Kafka Connector)自动订阅和同步Kafka中的数据。本文介绍Doris Kafka Connector同步数据至云数据库 SelectDB 版的使用方式。

背景信息

Kafka Connect是一款在Apache Kafka和其他系统之间进行可靠数据传输的工具,可通过定义Connectors来将大量数据迁入或者迁出Kafka。

Doris社区提供的Kafka连接器,运行在Kafka Connect集群中,支持从Kafka Topic中读取数据,并将数据写入云数据库 SelectDB 版中。

在业务场景中,用户通常会通过Debezium Connector将数据库的变更数据推送至Kafka,或者调用API将JSON格式数据实时写入Kafka。Doris Kafka Connector会自动订阅Kafka中的数据,并将这些数据同步到云数据库 SelectDB 版中。

使用模式

Standalone模式

警告

不建议在生产环境中使用Standalone模式。

配置Standalone

配置connect-standalone.properties。

# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092

在Kafka config目录下创建connect-selectdb-sink.properties,并配置如下内容:

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

启动Standalone

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed模式

配置Distributed

配置connect-distributed.properties。

# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
 
# 修改 group.id,同一集群的需要一致
group.id=connect-cluster

启动Distributed

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

增加Connector

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "topics":"topic_test",
    "doris.topic2table.map": "topic_test:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
    "doris.user":"admin",
    "doris.password":"***",
    "doris.database":"test_db",
    "doris.http.port":"8080",
    "doris.query.port":"9030",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}'

配置项

参数

说明

name

Connector的名称。一般命名为不包含ISO控制符的字符串,必须在Kafka Connect环境中唯一。

connector.class

Connector类的名称或者别名。固定为org.apache.doris.kafka.connector.DorisSinkConnector

topics

数据源Topic。不同Topic之间以英文逗号(,)进行分隔。

doris.topic2table.map

Topic和Table表的对应关系,多个对应关系之间以英文逗号(,)进行分隔例:topic1:tb1,topic2:tb2。默认为空,表示Topic和Table名称相同。

buffer.count.records

在flush到SelectDB之前,每个Kafka分区在内存中缓冲的记录数。默认10000条记录。

buffer.flush.time

内存中缓冲的刷新间隔,单位秒。默认为120。

buffer.size.bytes

每个Kafka分区在内存中缓冲的记录的累积大小,单位字节。默认为5000000。

doris.urls

SelectDB连接地址。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取。

doris.http.port

SelectDB HTTP协议端口,默认为8080。

doris.query.port

SelectDB MySQL协议端口,默认为9030。

doris.user

SelectDB用户名。

doris.password

SelectDB密码。

doris.database

写入数据的SelectDB数据库。

key.converter

指定Key的JSON转换器类。

value.converter

指定Value的JSON转换器类。

jmx

通过 JMX 获取 Connector 内部监控指标, 请参考:Doris-Connector-JMX。默认为TRUE。

enable.delete

是否同步删除记录, 默认 false。

label.prefix

通过Stream Load导入数据时的 label 前缀。默认为Connector应用名称。

auto.redirect

是否重定向Stream Load请求。开启后StreamLoad将通过FE重定向到需要写入数据的 BE,并且不再显示获取 BE 信息。

load.model

导入数据的方式。支持stream_load直接数据导入到 Doris 中;同时支持copy_into的方式导入数据至对象存储中,然后将数据加载至 Doris 中。默认为stream_load

sink.properties.*

Stream Load 的导入参数。

例如: 定义列分隔符'sink.properties.column_separator':','

详细参数参考Stream Load

delivery.guarantee

消费 Kafka 数据导入至 SelectDB 时,数据一致性的保障方式。 支持at_least_onceexactly_once,默认为at_least_once

当前 SelectDB 只能保障使用 copy into 导入的数据exactly_once

说明

其他 Kafka Connect Sink 通用配置项,详情请参见connect_configuring

使用示例

环境准备

  1. 安装Apache Kafka集群或Confluent Cloud,版本不低于2.4.0,本文以Kafka单机环境为例。

    #下载并解压 
    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
    bin/kafka-server-start.sh -daemon config/server.properties
  2. 下载doris-kafka-connector-1.0.0.jar,并将JAR包放到KAKFA_HOME/libs目录下。

  3. 创建云数据库 SelectDB 版实例,详情请参见创建实例

  4. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

  5. 创建测试数据库和测试表。

    1. 创建测试数据库。

      CREATE DATABASE test_db;
    2. 创建测试表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

示例一:快速同步JSON数据

  1. 配置SelectDB Sink

    以Standalone模式为例,在Kafka config目录下创建selectdb-sink.properties,并配置如下内容:

    name=selectdb_sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test_topic
    doris.topic2table.map=test_topic:example_tbl
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=***
    doris.database=test_db
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #配置死信队列,可选
    errors.tolerance=all
    errors.deadletterqueue.topic.name=test_error
    errors.deadletterqueue.context.headers.enable = true
    errors.deadletterqueue.topic.replication.factor=1
  2. 启动Kafka Connect

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

示例二:使用Debezium同步MySQL数据到SelectDB

很多业务场景经常需要从业务数据库中实时同步数据,这个时候就需要使用数据库的变更数据捕获(Change Data Capture,简称CDC)机制。

Debezium是基于Kafka Connect的CDC工具,可以对接MySQL、PostgreSQL、SQL Server、Oracle、MongoDB等多种数据库,把数据库的数据持续以统一的格式发送到Kafka的Topic中,以供下游Sink端进行实时消费,本文以MySQL为例。

  1. 下载Debezium。

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  2. 解压下载文件。

    tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  3. 将解压后的所有JAR包放置到KAKFA_HOME/libs目录下。

  4. 配置MySQL Source。

    在Kafka config目录下创建mysql-source.properties,并配置如下内容:

    name=mysql-source
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
    database.port=3306
    database.user=testuser
    database.password=****
    database.server.id=1
    # kafka中的该client的唯一标识
    database.server.name=test123
    # 需要同步的数据库和表,默认是同步所有数据库和表
    database.include.list=test
    table.include.list=test.test_table
    database.history.kafka.bootstrap.servers=localhost:9092
    # 用于存储数据库表结构变化的 Kafka topic
    database.history.kafka.topic=dbhistory
    transforms=unwrap
    # 参考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    # 记录删除事件
    transforms.unwrap.delete.handling.mode=rewrite

    配置好之后,Kafka中默认的Topic名称格式是SERVER_NAME.DATABASE_NAME.TABLE_NAME

    说明

    Debezium配置请参见Debezium connector for MySQL

  5. 配置SelectDB Sink。

    在Kafka config目录下创建selectdb-sink.properties,并配置如下内容:

    name=selectdb-sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test123.test.test_table
    doris.topic2table.map=test123.test.test_table:test_table
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=****
    doris.database=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #配置死信队列,可选
    #errors.tolerance=all
    #errors.deadletterqueue.topic.name=test_error
    #errors.deadletterqueue.context.headers.enable = true
    #errors.deadletterqueue.topic.replication.factor=1
    说明

    同步到云数据库 SelectDB 版时,需要先提前创建好数据库和表。

  6. 启动Kafka Connect。

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
    说明

    启动后,可以观察日志logs/connect.log是否启动成功。

使用进阶

Connector操作

# 查看connector状态
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 删除当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 暂停当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 重启当前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# 重启connector内的tasks
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

详细信息请参见:Connect REST Interface

死信队列

默认情况下,转换过程中或转换过程中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过它们来容忍此类错误,可选择将每个错误和失败操作的详细信息以及有问题的记录(具有不同级别的详细信息)写入死信队列以便记录。

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

详细信息请参见:connect_errorreporting

访问SSL认证的Kafka集群

通过kafka-connect访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(client.truststore.jks)。您可以在connect-distributed.properties文件中增加以下配置:

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
 
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

关于通过kafka-connect连接SSL认证的Kafka集群配置说明可以参考:Configure Kafka Connect

  • 本页导读 (1)