实时消费Kafka数据
当您需要将Kafka数据写入云原生数据仓库AnalyticDB PostgreSQL版,且不希望使用其他数据集成工具时,可以通过实时数据消费功能直接消费Kafka数据,减少实时处理组件依赖,提升写入吞吐。
Apache Kafka是一个容错、低延迟、分布式的发布-订阅消息系统。Streaming Server支持从Apache和Confluent Kafka发行版中加载Kafka数据。通过云原生数据仓库AnalyticDB PostgreSQL版可读外表对Kafka数据进行转换,并将数据写入云原生数据仓库AnalyticDB PostgreSQL版目标表中。
前提条件
Kafka服务与云原生数据仓库AnalyticDB PostgreSQL版实例需在同一专有网络(VPC)。
已在Kafka服务中生成了大量样例数据。本文以阿里云云消息队列Kafka版为例,具体信息如下。
接入点信息:alikafka-post-cn-wwo3hflb****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-3-vpc.alikafka.aliyuncs.com:9092
Topic:test_topic
consumer group:test_consumer_group
已在云原生数据仓库AnalyticDB PostgreSQL版中创建目标用户和目标表,同时在任务中使用的数据库用户需要具备以下权限。
使用gpfdist协议创建只读外表的权限。
任务中配置的数据库Schema的USAGE和CREATE权限。
任务中配置的写入目标表的SELECT和INSERT权限。
本文以
liss_test
用户和liss_test.liss_test_plaintext
表为例。CREATE role liss_test with login; ALTER role liss_test with password 'lissTest****'; ALTER role liss_test CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); \c - liss_test CREATE DATABASE liss_test; \c liss_test CREATE SCHEMA liss_test; CREATE TABLE liss_test.liss_test_plaintext ( column_1 varchar(32), column_2 bigint, column_3 numeric, column_4 varchar(32), column_5 varchar(32) ) distributed by (column_1, column_2);
使用限制
云原生数据仓库 AnalyticDB PostgreSQL 版6.0实例需为v6.6.0及以上版本。云原生数据仓库 AnalyticDB PostgreSQL 版7.0实例需为v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式实例暂不支持。
实时数据消费目前仅支持INSERT、MERGE(UPSERT)、UPDATE三种语法,暂不支持DELETE与READ。
使用MERGE(UPSERT)或UPDATE时,需要云原生数据仓库 AnalyticDB PostgreSQL 版表有主键索引。
使用实时数据消费,不同分区(Partition)之间需要使用主键列做分区因子,否则可能会造成全局死锁错误,导致部分数据更新失败。
实时数据消费当前仅支持Kafka消息队列,暂不支持CDC格式的数据源。
当前的版本向导模式支持CSV和Delimited两种格式的数据源,专业模式支持CSV、Delimited和protobuf三种格式的数据源。
操作步骤
步骤一:开启实时数据服务
在控制台左上角,选择实例所在地域。
找到目标实例,单击实例ID。
在控制台左侧导航栏单击实时数据消费,再单击左上角开启实时数据服务。
在弹出的对话框中填写名称及服务描述并单击确定。开通完成后,可在控制台看到服务状态和连接信息。
说明服务规格当前不可选,默认为8CU。
步骤二:新增实时数据源
- 登录云原生数据仓库AnalyticDB PostgreSQL版控制台。
- 在控制台左上角,选择实例所在地域。
- 找到目标实例,单击实例ID。
在左侧导航栏,单击实时数据消费。
在实时数据源卡片中,单击新增数据源,并完成以下配置。
配置项
描述
关联数据服务
在下拉框中选择已创建的实时数据服务。
数据源名称
自定义数据源名称。
数据源描述
自定义数据源描述。
数据源类型
目前仅支持Kafka。
brokers
Kafka接入点信息。
阿里云的Kafka服务,可登录阿里云控制台获取默认接入点。具体操作,请参见查看接入点。
自建的kafka服务,Brokers需要填写Kafka服务具体的
`hostname:port`
或`ip:port`
信息。
topic
Kafka的Topic名称。
format
当前版本向导模式支持CSV和Delimited两种格式的数据源,专业模式支持CSV、Delimited和protobuf三种格式的数据源。
列分隔符
可设置任意单字符分隔符。
单击确定。
步骤三:新增实时任务
在实时任务卡片中,单击新增实时任务,并完成以下配置。
请根据业务需要选择向导模式或专业模式。
向导模式:可以通过控制台中的指引来快速搭建任务。
专业模式:可以通过提交YAML的方式向Streaming Server提交任务,功能相比于向导模式更丰富。
向导模式
配置项
描述
基本信息
任务名称
定义任务的名称,任务名称不可以重复,必填。
任务描述
描述任务内容,选填。
配置模式
向导模式。
源端配置
数据源
选择在新增实时数据源中配置的数据源,目前仅支持Kafka为源的数据源。
group_name
Kafka的消费者组。
failback_offset
消费位点。
earliest:从最早可用位点消费。
latest:从最新的位点开始消费。
投递保证
流计算中的一致性语义,支持:
ATLEAST:在Kafka中的数据至少有一次被写入云原生数据仓库AnalyticDB PostgreSQL版。
EXACTLY:在Kafka中的数据有且仅有一次被写入云原生数据仓库AnalyticDB PostgreSQL版。
目标端配置
目标库
需要写入的云原生数据仓库AnalyticDB PostgreSQL版目标数据库名称。
Schema
云原生数据仓库AnalyticDB PostgreSQL版的模式名称。
目标表
需要写入的云原生数据仓库AnalyticDB PostgreSQL版目标表名称。
账号
当前任务使用的云原生数据仓库AnalyticDB PostgreSQL版数据库账号。
密码
账号密码。
写入模式
目前仅支持INSERT、UPDATE和MERGE三种写入模式。
INSERT:将数据直接写入目标表。
UPDATE:当输入列中的MatchColumns与目标表中的列匹配,更新UpdateColumns中列出的目标表列。
MERGE:当写入数据与目标表列的值相等时,使用写入数据更新目标表列的现有数据。当写入数据与目标表列的值不相等时,直接将数据写入目标表。MERGE写入模式可类比于UPSERT(UPDATE and INSERT),关于UPSERT的写入方式,请参见使用INSERT ON CONFLICT覆盖写入数据。
说明MatchColumns与UpdateColumns的含义请参见下文字段类型的描述。
ErrorLimitCount
错误数据的容忍阈值。当写入的错误数据到达ErrorLimitCount时,Streaming Server会自动停止将数据源的数据写入云原生数据仓库AnalyticDB PostgreSQL版。0表示Streaming Server遇到第一次错误数据时就会停止数据写入。目前该参数未启用,填0即可。
字段映射
源字段
Kafka消息中的Value字段名,需要按照在Value中出现的顺序指定所有的字段名。
目标字段
云原生数据仓库AnalyticDB PostgreSQL版目标表的字段名。
字段类型
目前支持以下三种类型:
MatchColumns:作为写入时的Join条件列作为更新条件,用于判断目标表中哪些行需要被更新。
UpdateColumns:如果某一行数据符合更新条件,那么在UpdateColumns中的列会被更新为新的值。
空(不填):即使某一行数据符合更新条件,该字段也不会被更新为新的值。
在UPDATE和MERGE写入时,Streaming Server会先将数据写入一个临时表,然后利用MatchColumns作为条件列与目标表进行Join:
如果有匹配的数据,则会更新UpdateColumns中的数据。
如果没有匹配的数据时,则会根据写入模式有以下两种情况:
写入模式为UPDATE时,数据不会被写入。
写入模式为MERGE时,数据会被写入。
专业模式
配置项
描述
基本信息
任务名称
定义任务的名称,任务名称不可以重复,必填。
任务描述
描述任务内容,选填。
配置模式
专业模式。
数据源
选择在新增实时数据源中配置的数据源,目前仅支持Kafka为源的数据源。
YAML
可以通过YAML配置更复杂的写入逻辑。本文的YAML配置示例如下。更多详情,请参见附录:YAML配置说明。
DATABASE: liss_test USER: liss_test PASSWORD: lissTest**** HOST: gp-2ze517f9l7****-master.gpdb.rds-aliyun-pre.rds.aliyuncs.com PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: alikafka-post-cn-wwo3hflbo002-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-3-vpc.alikafka.aliyuncs.com:9092 TOPIC: test_topic FALLBACK_OFFSET: EARLIEST VALUE: COLUMNS: - NAME: column_1 TYPE: varchar(32) - NAME: column_2 TYPE: bigint - NAME: column_3 TYPE: numeric - NAME: column_4 TYPE: varchar(32) - NAME: column_5 TYPE: varchar(32) FORMAT: delimited DELIMITED_OPTION: DELIMITER: "|" ERROR_LIMIT: 20 OUTPUT: SCHEMA: liss_test TABLE: liss_test_plaintext MODE: MERGE MATCH_COLUMNS: - column_1 - column_2 UPDATE_COLUMNS: - column_3 - column_4 - column_5 MAPPING: - NAME: column_1 EXPRESSION: column_1 - NAME: column_2 EXPRESSION: column_2 - NAME: column_3 EXPRESSION: column_3 - NAME: column_4 EXPRESSION: column_4 - NAME: column_5 EXPRESSION: column_5 COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 1000 CONSISTENCY: ATLEAST POLL: BATCHSIZE: 1000 TIMEOUT: 1000 PROPERTIES: group.id: test_consumer_group
单击确定,并等待实时任务状态为运行中,即可将数据源中的数据写入云原生数据仓库AnalyticDB PostgreSQL版。
在任务启动后会在目标端配置的Schema(专业模式为METADATA.SCHEMA中配置的schema)下生成任务的两种辅助表,其格式分别为:
lissext_$UID
:本任务定义的gpfdist外表,用于将数据写入云原生数据仓库AnalyticDB PostgreSQL版。lisskafka_mission_info_$UID
:用于存储任务当前位点推进的情况,保障数据写入的一致性。目前为了保障写入任务的高可用,每个写入任务会生成4个子任务,所以每启动一个写入任务,会生成4张表。UID是每个写入任务的唯一标识ID。
附录:YAML配置说明
YAML配置文件格式如下。
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <adbpg_port>
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[PARTITIONS: (<partition_numbers>)]
[FALLBACK_OFFSET: { earliest | latest }]
[VALUE:
COLUMNS:
- NAME: { <column_name> }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[KEY:
COLUMNS:
- NAME: { <column_name> }
TYPE: <column_data_type>
[ ... ]
FORMAT: <key_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string> |
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
{ OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ] }
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries> ]
数据库相关配置
参数 | 描述 | 是否必填 |
DATABASE | 目标端云原生数据仓库AnalyticDB PostgreSQL版实例的数据库名称。 | 是 |
USER | 云原生数据仓库AnalyticDB PostgreSQL版实例的账号。 | 是 |
PASSWORD | 云原生数据仓库AnalyticDB PostgreSQL版实例的账号密码。 | 是 |
HOST | 目标端云原生数据仓库AnalyticDB PostgreSQL版实例的内网地址。 | 是 |
PORT | 云原生数据仓库AnalyticDB PostgreSQL版实例的端口号。 | 是 |
VERSION | 当前采用的YAML文件格式版本,预留字段,无限制。 | 否 |
KAFKA:INPUT配置
KAFKA:INPUT:SOURCE
参数 | 描述 | 是否必填 | 参数值限制 |
BROKERS | Kafka接入点信息。
如有多个使用英文逗号( | 是 | 对应kafka consumer bootstrap.server 配置,需要填写有效的Brokers地址,否则会报错。 |
TOPIC | Kafka Topic名称。 | 是 | 仅支持单个Topic。 |
PARTITIONS | 分区编号。 如有多个分区编号,使用英文逗号( | 否 | 例如:1,2,3,4,5 |
FALLBACK_OFFSET | 消费位点。
| 是 | 无 |
KAFKA:INPUT:KEY和KAFKA:INPUT:VALUE
Kafka消息的Key值字段名称、数据类型和数据格式。
Kafka消息的Value字段名称、数据类型和数据格式。
必须按照在Key和Value中出现的顺序指定所有Kafka数据元素。
KAFKA:INPUT:KEY
和KAFKA:INPUT:VALUE
至少需要配置一个,如果两个都未配置会报错。
参数 | 描述 | 是否必填 | 参数值限制 |
COLUMNS | 如果定义 如果定义 | 是 | 无 |
NAME | 定义Kafka消息中的列名。该列名主要在 | 是 | 无 |
TYPE | 定义Kafka消息中列的类型,数据类型需要与这个列在目标数据库中的类型保持一致。 由于Kafka消息中Key和Value的格式不透明,因此当前Streaming Server默认从Kafka消息中获取的数据格式为文本形式。 | 是 | 云原生数据仓库AnalyticDB PostgreSQL版支持的数据类型请参见数据类型。 如果Kafka消息的列与目标列的类型不一致,请在Mapping中的expression部分对类型进行转换。 |
FORMAT | 定义Kafka消息数据的类型,当前支持CSV、Delimited和protobuf。 | 是 | 无 |
KAFKA:INPUT:META
META不是必填项,当您需要展示Message Meta信息时配置。
参数 | 描述 | 是否必填 | 参数值限制 |
COLUMNS | 定义Meta信息,为一组NAME,TYPE。 | 是 | 无 |
NAME | Meta名称,可以指定为其他的名称,默认使用 | 是 | 无 |
TYPE | 只能使用Text类型。 | 是 | Text |
FORMAT | 只能使用Text类型。 | 是 | Text |
KAFKA:INPUT:ERROR_LIMIT
错误数据的容忍阈值。当写入的错误数据达到ERROR_LIMIT时,Streaming Server会退出当前任务,自动停止将数据源的数据写入云原生数据仓库AnalyticDB PostgreSQL版。默认值为0,即Streaming Server会在出现第一次错误数据时就退出当前任务,停止数据写入。ERROR_LIMIT值必须大于1。
目前该参数未启用,不选择或者填0即可。
KAFKA:OUTPUT配置
数据库相关配置
数据写入到云原生数据仓库AnalyticDB PostgreSQL版数据库的相关配置,包括Kafka值到目标数据库的映射、写入模式等。
参数 | 描述 | 是否必填 |
SCHEMA | 写入云原生数据仓库AnalyticDB PostgreSQL版的目标表所在的Schema。 | 是 |
TABLE | 目标表的名称。 | 是 |
MODE | 写入模式,目前支持INSERT、UPDATE和MERGE三种方式。 | 是 |
MATCH_COLUMNS | 当写入模式为UPDATE和MERGE时生效。 指定目标表的部分列,当写入数据与目标表数据匹配时,目标表中这部分数据会根据UPDATE或MERGE模式对数据进行更新。 建议MATCH_COLUMNS使用目标表的主键或者唯一键。 | 否 |
ORDER_COLUMNS | 在写入模式(MODE)为MERGE时生效。 当写入数据根据MATCH_COLUMNS存在多个匹配行时,使用ORDER_COLUMNS对这些数据进行排序,以确定具有最大值的输入行,Streaming Server使用该行来更新目标。 | 否 |
UPDATE_COLUMNS | 当写入模式为UPDATE和MERGE时生效。 如果写入数据能够根据MATCH_COLUMNS匹配到目标表数据,则会基于UPDATE_COLUMNS更新对应的列。 | 否 |
在使用MERGE和UPDATE模式时,如果不指定ORDER_COLUMNS,当写入数据根据MATCH_COLUMNS匹配到多行相同时,则会随机取一条作为结果写入。
在指定了ORDER_COLUMNS后,其排序结果是
a desc,b desc,c desc
。
KAFKA:OUTPUT:MAPPING
参数 | 描述 | 是否必填 |
NAME | 目标列名称。 | 是 |
EXPRESSION | 可以是源端的列名( | 是 |
KAFKA:METADATA配置
参数 | 描述 | 是否必填 | 参数限制 |
schema | Streaming Server创建的外表和其他辅助表所在的Schema名称。 | 否 | 默认取值 |
KAFKA:COMMIT配置
COMMIT用于控制向数据库提交数据的行为。
参数 | 描述 | 是否必填 | 参数限制 |
MAX_ROW | 指定一次写入目标库的最大Batch Size。 | 否 | 单位为行,默认:500。 |
MINIMAL_INTERVAL | 在两个Batch写入之间的等待时间。如果超过该时间,会尝试再写一次。 | 否 | 单位为毫秒(ms),默认:1000。 |
CONSISTENCY | 数据一致性保证。 | 否 | 目前仅支持ATLEAST,即kafka中的数据至少会写入目标数据库一次。 |
KAFKA:POLL配置
POLL用于控制Kafka Consumer消费数据的行为。
参数 | 描述 | 是否必填 | 参数限制 |
BATCHSIZE | 一次从Topic中拿出的event数量。保留字段,目前没有实现相关功能。 | 否 | 单位为行,默认:64。 |
TIMEOUT | Kafka Consumer从Kafka中获取event等待的超时时间。 | 否 | 单位为毫秒(ms),默认:5000。 |
KAFKA:PROPERTIES配置
PROPERTIES用于配置Kafka Connect,当前采用白名单制,仅支持配置group.id
,auto.offset.reset
和isolation.level
。详细信息,请参见Kafka Connect Configs。