实时消费Kafka数据

更新时间:

当您需要将Kafka数据写入云原生数据仓库AnalyticDB PostgreSQL版,且不希望使用其他数据集成工具时,可以通过实时数据消费功能直接消费Kafka数据,减少实时处理组件依赖,提升写入吞吐。

Apache Kafka是一个容错、低延迟、分布式的发布-订阅消息系统。Streaming Server支持从Apache和Confluent Kafka发行版中加载Kafka数据。通过云原生数据仓库AnalyticDB PostgreSQL版可读外表对Kafka数据进行转换,并将数据写入云原生数据仓库AnalyticDB PostgreSQL版目标表中。

前提条件

  • Kafka服务与云原生数据仓库AnalyticDB PostgreSQL版实例需在同一专有网络(VPC)。

    重要

    如果Kafka服务与云原生数据仓库AnalyticDB PostgreSQL版实例属于同一专有网络但是不在同一交换机(vSwitch)下,您需要进行如下操作:

    • 将Kafka服务所在交换机的IPv4网段添加至云原生数据仓库AnalyticDB PostgreSQL版实例白名单中。具体操作,请参见设置白名单

    • 云原生数据仓库AnalyticDB PostgreSQL版实例所在交换机的IPv4网段添加至Kafka服务白名单中。具体操作,请参见配置白名单

  • 已在Kafka服务中生成了大量样例数据。本文以阿里云云消息队列Kafka版为例,具体信息如下。

    • 接入点信息:alikafka-post-cn-wwo3hflb****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-3-vpc.alikafka.aliyuncs.com:9092

    • Topic:test_topic

    • consumer group:test_consumer_group

  • 已在云原生数据仓库AnalyticDB PostgreSQL版中创建目标用户和目标表,同时在任务中使用的数据库用户需要具备以下权限。

    • 使用gpfdist协议创建只读外表的权限。

    • 任务中配置的数据库Schema的USAGE和CREATE权限。

    • 任务中配置的写入目标表的SELECT和INSERT权限。

    本文以liss_test用户和liss_test.liss_test_plaintext表为例。

    CREATE role liss_test with login;
    ALTER role liss_test with password 'lissTest****';
    ALTER role liss_test CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); 
    
    \c - liss_test
    CREATE DATABASE liss_test;
    \c liss_test
    CREATE SCHEMA liss_test;
    
    CREATE TABLE liss_test.liss_test_plaintext (
    column_1 varchar(32),
    column_2 bigint,
    column_3 numeric,
    column_4 varchar(32),
    column_5 varchar(32)
    ) distributed by (column_1, column_2);

使用限制

  • 云原生数据仓库 AnalyticDB PostgreSQL 版6.0实例需为v6.6.0及以上版本。云原生数据仓库 AnalyticDB PostgreSQL 版7.0实例需为v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式实例暂不支持。

  • 实时数据消费目前仅支持INSERT、MERGE(UPSERT)、UPDATE三种语法,暂不支持DELETE与READ。

  • 使用MERGE(UPSERT)或UPDATE时,需要云原生数据仓库 AnalyticDB PostgreSQL 版表有主键索引。

  • 使用实时数据消费,不同分区(Partition)之间需要使用主键列做分区因子,否则可能会造成全局死锁错误,导致部分数据更新失败。

  • 实时数据消费当前仅支持Kafka消息队列,暂不支持CDC格式的数据源。

  • 当前的版本向导模式支持CSV和Delimited两种格式的数据源,专业模式支持CSV、Delimited和protobuf三种格式的数据源。

操作步骤

步骤一:开启实时数据服务

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台

  2. 在控制台左上角,选择实例所在地域。

  3. 找到目标实例,单击实例ID。

  4. 在控制台左侧导航栏单击实时数据消费,再单击左上角开启实时数据服务

    image

  5. 在弹出的对话框中填写名称服务描述并单击确定。开通完成后,可在控制台看到服务状态连接信息

    image

    说明

    服务规格当前不可选,默认为8CU。

步骤二:新增实时数据源

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台
  2. 在控制台左上角,选择实例所在地域。
  3. 找到目标实例,单击实例ID。
  4. 在左侧导航栏,单击实时数据消费

  5. 实时数据源卡片中,单击新增数据源,并完成以下配置。

    配置项

    描述

    关联数据服务

    在下拉框中选择已创建的实时数据服务。

    数据源名称

    自定义数据源名称。

    数据源描述

    自定义数据源描述。

    数据源类型

    目前仅支持Kafka。

    brokers

    Kafka接入点信息。

    • 阿里云的Kafka服务,可登录阿里云控制台获取默认接入点。具体操作,请参见查看接入点

    • 自建的kafka服务,Brokers需要填写Kafka服务具体的`hostname:port``ip:port`信息。

    topic

    Kafka的Topic名称。

    format

    当前版本向导模式支持CSV和Delimited两种格式的数据源,专业模式支持CSV、Delimited和protobuf三种格式的数据源。

    列分隔符

    可设置任意单字符分隔符。

  6. 单击确定

步骤三:新增实时任务

  1. 实时任务卡片中,单击新增实时任务,并完成以下配置。

    请根据业务需要选择向导模式专业模式

    向导模式:可以通过控制台中的指引来快速搭建任务。

    专业模式:可以通过提交YAML的方式向Streaming Server提交任务,功能相比于向导模式更丰富。

    向导模式

    配置项

    描述

    基本信息

    任务名称

    定义任务的名称,任务名称不可以重复,必填。

    任务描述

    描述任务内容,选填。

    配置模式

    向导模式。

    源端配置

    数据源

    选择在新增实时数据源中配置的数据源,目前仅支持Kafka为源的数据源。

    group_name

    Kafka的消费者组。

    failback_offset

    消费位点。

    • earliest:从最早可用位点消费。

    • latest:从最新的位点开始消费。

    投递保证

    流计算中的一致性语义,支持:

    • ATLEAST:在Kafka中的数据至少有一次被写入云原生数据仓库AnalyticDB PostgreSQL版

    • EXACTLY:在Kafka中的数据有且仅有一次被写入云原生数据仓库AnalyticDB PostgreSQL版

    目标端配置

    目标库

    需要写入的云原生数据仓库AnalyticDB PostgreSQL版目标数据库名称。

    Schema

    云原生数据仓库AnalyticDB PostgreSQL版的模式名称。

    目标表

    需要写入的云原生数据仓库AnalyticDB PostgreSQL版目标表名称。

    账号

    当前任务使用的云原生数据仓库AnalyticDB PostgreSQL版数据库账号。

    密码

    账号密码。

    写入模式

    目前仅支持INSERT、UPDATE和MERGE三种写入模式。

    • INSERT:将数据直接写入目标表。

    • UPDATE:当输入列中的MatchColumns与目标表中的列匹配,更新UpdateColumns中列出的目标表列。

    • MERGE:当写入数据与目标表列的值相等时,使用写入数据更新目标表列的现有数据。当写入数据与目标表列的值不相等时,直接将数据写入目标表。MERGE写入模式可类比于UPSERT(UPDATE and INSERT),关于UPSERT的写入方式,请参见使用INSERT ON CONFLICT覆盖写入数据

    说明

    MatchColumns与UpdateColumns的含义请参见下文字段类型的描述。

    ErrorLimitCount

    错误数据的容忍阈值。当写入的错误数据到达ErrorLimitCount时,Streaming Server会自动停止将数据源的数据写入云原生数据仓库AnalyticDB PostgreSQL版。0表示Streaming Server遇到第一次错误数据时就会停止数据写入。目前该参数未启用,填0即可。

    字段映射

    源字段

    Kafka消息中的Value字段名,需要按照在Value中出现的顺序指定所有的字段名。

    目标字段

    云原生数据仓库AnalyticDB PostgreSQL版目标表的字段名。

    字段类型

    目前支持以下三种类型:

    • MatchColumns:作为写入时的Join条件列作为更新条件,用于判断目标表中哪些行需要被更新。

    • UpdateColumns:如果某一行数据符合更新条件,那么在UpdateColumns中的列会被更新为新的值。

    • 空(不填):即使某一行数据符合更新条件,该字段也不会被更新为新的值。

    在UPDATE和MERGE写入时,Streaming Server会先将数据写入一个临时表,然后利用MatchColumns作为条件列与目标表进行Join:

    • 如果有匹配的数据,则会更新UpdateColumns中的数据。

    • 如果没有匹配的数据时,则会根据写入模式有以下两种情况:

      • 写入模式为UPDATE时,数据不会被写入。

      • 写入模式为MERGE时,数据会被写入。

    专业模式

    配置项

    描述

    基本信息

    任务名称

    定义任务的名称,任务名称不可以重复,必填。

    任务描述

    描述任务内容,选填。

    配置模式

    专业模式。

    数据源

    选择在新增实时数据源中配置的数据源,目前仅支持Kafka为源的数据源。

    YAML

    可以通过YAML配置更复杂的写入逻辑。本文的YAML配置示例如下。更多详情,请参见附录:YAML配置说明

    DATABASE: liss_test
    USER: liss_test
    PASSWORD: lissTest****
    HOST: gp-2ze517f9l7****-master.gpdb.rds-aliyun-pre.rds.aliyuncs.com
    PORT: 5432
    KAFKA:
      INPUT:
        SOURCE:
          BROKERS: alikafka-post-cn-wwo3hflbo002-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-3-vpc.alikafka.aliyuncs.com:9092
          TOPIC: test_topic
          FALLBACK_OFFSET: EARLIEST
        VALUE:
          COLUMNS:
          - NAME: column_1
            TYPE: varchar(32)
          - NAME: column_2
            TYPE: bigint
          - NAME: column_3
            TYPE: numeric
          - NAME: column_4
            TYPE: varchar(32)
          - NAME: column_5
            TYPE: varchar(32)
          FORMAT: delimited
          DELIMITED_OPTION:
            DELIMITER: "|"
        ERROR_LIMIT: 20
      OUTPUT:
        SCHEMA: liss_test
        TABLE: liss_test_plaintext
        MODE: MERGE
        MATCH_COLUMNS:
        - column_1
        - column_2
        UPDATE_COLUMNS:
        - column_3
        - column_4
        - column_5
        MAPPING:
        - NAME: column_1
          EXPRESSION: column_1
        - NAME: column_2
          EXPRESSION: column_2
        - NAME: column_3
          EXPRESSION: column_3
        - NAME: column_4
          EXPRESSION: column_4
        - NAME: column_5
          EXPRESSION: column_5
      COMMIT:
        MAX_ROW: 1000
        MINIMAL_INTERVAL: 1000
        CONSISTENCY: ATLEAST
      POLL:
        BATCHSIZE: 1000
        TIMEOUT: 1000
      PROPERTIES:
        group.id: test_consumer_group
  2. 单击确定,并等待实时任务状态为运行中,即可将数据源中的数据写入云原生数据仓库AnalyticDB PostgreSQL版

在任务启动后会在目标端配置的Schema(专业模式为METADATA.SCHEMA中配置的schema)下生成任务的两种辅助表,其格式分别为:

  • lissext_$UID:本任务定义的gpfdist外表,用于将数据写入云原生数据仓库AnalyticDB PostgreSQL版

  • lisskafka_mission_info_$UID:用于存储任务当前位点推进的情况,保障数据写入的一致性。目前为了保障写入任务的高可用,每个写入任务会生成4个子任务,所以每启动一个写入任务,会生成4张表。

  • UID是每个写入任务的唯一标识ID。

附录:YAML配置说明

YAML配置文件格式如下。

DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <adbpg_port>
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: <kafka_broker_host:broker_port> [, ... ]
        TOPIC: <kafka_topic>
        [PARTITIONS: (<partition_numbers>)]
        [FALLBACK_OFFSET: { earliest | latest }]
      [VALUE:
        COLUMNS:
           - NAME: { <column_name> }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <value_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string>
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
      [KEY:
        COLUMNS:
           - NAME: { <column_name> }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <key_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string> |
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
      [META:
         COLUMNS:
            - NAME: <meta_column_name>
              TYPE: { json | jsonb }
         FORMAT: json]
      [ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
   { OUTPUT:
      [SCHEMA: <output_schema_name>]
      TABLE: <table_name>
      [MODE: <mode>]
      [MATCH_COLUMNS: 
         - <match_column_name>
         [ ... ]]
      [ORDER_COLUMNS: 
         - <order_column_name>
         [ ... ]]
      [UPDATE_COLUMNS: 
         - <update_column_name>
         [ ... ]]
      [MAPPING: 
         - NAME: <target_column_name>
           EXPRESSION: { <source_column_name> | <expression> } 
         [ ... ]
           |
         <target_column_name> : { <source_column_name> | <expression> }
         [ ... ] ] }
   [METADATA:
      [SCHEMA: <metadata_schema_name>]]
   COMMIT:
      MAX_ROW: <num_rows>
      MINIMAL_INTERVAL: <wait_time>
      CONSISTENCY: { strong | at-least | at-most | none }
   [POLL:
      BATCHSIZE: <num_records>
      TIMEOUT: <poll_time>]
   [PROPERTIES:
      <kafka_property_name>: <kafka_property_value>
      [ ... ]]
[SCHEDULE:
   RETRY_INTERVAL: <retry_time>
   MAX_RETRIES: <num_retries> ]

数据库相关配置

参数

描述

是否必填

DATABASE

目标端云原生数据仓库AnalyticDB PostgreSQL版实例的数据库名称。

USER

云原生数据仓库AnalyticDB PostgreSQL版实例的账号。

PASSWORD

云原生数据仓库AnalyticDB PostgreSQL版实例的账号密码。

HOST

目标端云原生数据仓库AnalyticDB PostgreSQL版实例的内网地址。

PORT

云原生数据仓库AnalyticDB PostgreSQL版实例的端口号。

VERSION

当前采用的YAML文件格式版本,预留字段,无限制。

KAFKA:INPUT配置

KAFKA:INPUT:SOURCE

参数

描述

是否必填

参数值限制

BROKERS

Kafka接入点信息。

  • 阿里云的Kafka服务,可登录阿里云控制台获取默认接入点。具体操作,请参见查看接入点

  • 自建的Kafka服务,Brokers需要填写Kafka服务具体的ip:port信息。

如有多个使用英文逗号(,)进行分隔。

对应kafka consumer bootstrap.server 配置,需要填写有效的Brokers地址,否则会报错。

TOPIC

Kafka Topic名称。

仅支持单个Topic。

PARTITIONS

分区编号。

如有多个分区编号,使用英文逗号(,)进行分隔。如果在PROPERTIES中配置了group.id,那么该参数会被忽略。

例如:1,2,3,4,5

FALLBACK_OFFSET

消费位点。

  • earliest:从最早可用位点消费。

  • latest:从最新的位点开始消费。

KAFKA:INPUT:KEY和KAFKA:INPUT:VALUE

Kafka消息的Key值字段名称、数据类型和数据格式。

Kafka消息的Value字段名称、数据类型和数据格式。

必须按照在Key和Value中出现的顺序指定所有Kafka数据元素。

KAFKA:INPUT:KEYKAFKA:INPUT:VALUE至少需要配置一个,如果两个都未配置会报错。

参数

描述

是否必填

参数值限制

COLUMNS

如果定义KAFKA:INPUT:KEY,则用于定义Kafka消息中Key部分的列名与类型;

如果定义KAFKA:INPUT:VALUE,则用于定义Kafka消息中Value部分的列名与类型。

NAME

定义Kafka消息中的列名。该列名主要在KAFKA:OUTPUT:MAPPING中使用,用于标记Kafka消息中的数据列。

TYPE

定义Kafka消息中列的类型,数据类型需要与这个列在目标数据库中的类型保持一致。

由于Kafka消息中Key和Value的格式不透明,因此当前Streaming Server默认从Kafka消息中获取的数据格式为文本形式。

云原生数据仓库AnalyticDB PostgreSQL版支持的数据类型请参见数据类型

如果Kafka消息的列与目标列的类型不一致,请在Mapping中的expression部分对类型进行转换。

FORMAT

定义Kafka消息数据的类型,当前支持CSV、Delimited和protobuf。

KAFKA:INPUT:META

META不是必填项,当您需要展示Message Meta信息时配置。

参数

描述

是否必填

参数值限制

COLUMNS

定义Meta信息,为一组NAME,TYPE。

NAME

Meta名称,可以指定为其他的名称,默认使用meta

TYPE

只能使用Text类型。

Text

FORMAT

只能使用Text类型。

Text

KAFKA:INPUT:ERROR_LIMIT

错误数据的容忍阈值。当写入的错误数据达到ERROR_LIMIT时,Streaming Server会退出当前任务,自动停止将数据源的数据写入云原生数据仓库AnalyticDB PostgreSQL版。默认值为0,即Streaming Server会在出现第一次错误数据时就退出当前任务,停止数据写入。ERROR_LIMIT值必须大于1。

目前该参数未启用,不选择或者填0即可。

KAFKA:OUTPUT配置

数据库相关配置

数据写入到云原生数据仓库AnalyticDB PostgreSQL版数据库的相关配置,包括Kafka值到目标数据库的映射、写入模式等。

参数

描述

是否必填

SCHEMA

写入云原生数据仓库AnalyticDB PostgreSQL版的目标表所在的Schema。

TABLE

目标表的名称。

MODE

写入模式,目前支持INSERT、UPDATE和MERGE三种方式。

MATCH_COLUMNS

当写入模式为UPDATE和MERGE时生效。

指定目标表的部分列,当写入数据与目标表数据匹配时,目标表中这部分数据会根据UPDATE或MERGE模式对数据进行更新。

建议MATCH_COLUMNS使用目标表的主键或者唯一键。

ORDER_COLUMNS

在写入模式(MODE)为MERGE时生效。

当写入数据根据MATCH_COLUMNS存在多个匹配行时,使用ORDER_COLUMNS对这些数据进行排序,以确定具有最大值的输入行,Streaming Server使用该行来更新目标。

UPDATE_COLUMNS

当写入模式为UPDATE和MERGE时生效。

如果写入数据能够根据MATCH_COLUMNS匹配到目标表数据,则会基于UPDATE_COLUMNS更新对应的列。

说明
  • 在使用MERGE和UPDATE模式时,如果不指定ORDER_COLUMNS,当写入数据根据MATCH_COLUMNS匹配到多行相同时,则会随机取一条作为结果写入。

  • 在指定了ORDER_COLUMNS后,其排序结果是a desc,b desc,c desc

KAFKA:OUTPUT:MAPPING

参数

描述

是否必填

NAME

目标列名称。

EXPRESSION

可以是源端的列名(KAFKA:INPUT:VALUE:COLUMNS中定义的列名),或者一个表达式。例如NAME : targetColumnName Expression: input + 1,其效果等效于SELECT input + 1 AS targetColumnName FROM xxx

KAFKA:METADATA配置

参数

描述

是否必填

参数限制

schema

Streaming Server创建的外表和其他辅助表所在的Schema名称。

默认取值KAFKA:OUTPUT中的Schema。

KAFKA:COMMIT配置

COMMIT用于控制向数据库提交数据的行为。

参数

描述

是否必填

参数限制

MAX_ROW

指定一次写入目标库的最大Batch Size。

单位为行,默认:500。

MINIMAL_INTERVAL

在两个Batch写入之间的等待时间。如果超过该时间,会尝试再写一次。

单位为毫秒(ms),默认:1000。

CONSISTENCY

数据一致性保证。

目前仅支持ATLEAST,即kafka中的数据至少会写入目标数据库一次。

KAFKA:POLL配置

POLL用于控制Kafka Consumer消费数据的行为。

参数

描述

是否必填

参数限制

BATCHSIZE

一次从Topic中拿出的event数量。保留字段,目前没有实现相关功能。

单位为行,默认:64。

TIMEOUT

Kafka Consumer从Kafka中获取event等待的超时时间。

单位为毫秒(ms),默认:5000。

KAFKA:PROPERTIES配置

PROPERTIES用于配置Kafka Connect,当前采用白名单制,仅支持配置group.idauto.offset.resetisolation.level。详细信息,请参见Kafka Connect Configs