本文介绍如何通过社区版Flink将Kafka的数据同步至AnalyticDB PostgreSQL版

前提条件

  • 已将社区版Flink的客户端IP地址添加至AnalyticDB PostgreSQL版白名单中,设置白名单的方法,请参见设置白名单
  • 在Flink的客户端的$FLINK_HOME/lib路径上部署Kafka Connector相关依赖,您可以直接使用Flink官网提供的Table API Kafka Connector,具体内容,请参见Apache Kafka SQL Connector
    本文示例中使用的依赖如下。
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.17-SNAPSHOT</version>
    </dependency>
  • 在Flink的客户端的$FLINK_HOME/lib路径上部署AnalyticDB PostgreSQL Connector相关依赖。获取AnalyticDB PostgreSQL Connector的JAR包,请参见AnalyticDB PostgreSQL Connector

    本文示例中使用的AnalyticDB PostgreSQL Connector版本为1.13。实际使用时建议选择Flink引擎版本相近的版本。

注意事项

阿里云实时计算Flink版与社区版Flink操作存在差异,但是使用过程基本相同,如需使用阿里云实时计算Flink版,请参见阿里云实时计算Flink版文档

操作步骤

  1. 在Flink上创建一张表,用于读取Kafka表的数据。
    CREATE TABLE KafkaTable (
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING,
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
      `shard` BIGINT METADATA FROM 'partition' VIRTUAL,
      `meta_offset` BIGINT METADATA FROM 'offset' VIRTUAL
    
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

    Kafka Connector参数说明如下:

    参数是否必填说明
    connectorConnector名称,此处固定为kafka
    topicKafka的Topic名称。
    properties.bootstrap.serversKafka客户端的连接地址和端口。
    properties.group.idKafka的消费组ID。
    scan.startup.mode数据消费的起始点位,具体介绍,请参见起始消费点位
    value.format序列化和反序列化Kafka消息体时使用的格式。更多格式和相关配置的介绍,请参见格式

    更多Kafka Connector参数介绍,请参见连接器参数

    Kafka的每条消息(Record)中均包含了一些元数据(例如timestamp、offset、partition等),这些元数据在业务中可以起到一定作用。测试表中的event_time、meta_offset和shard列就是从Kafka消息中获取到的有用信息。关于可用的元数据的介绍,请参见可用的元数据

  2. AnalyticDB PostgreSQL版上创建一张目标表。
    CREATE TABLE ADBPGTargetTable (
      user_id BIGINT primary key,
      item_id BIGINT,
      behavior VARCHAR,
      event_time TIMESTAMP,
      shard BIGINT,            -- partition在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的partition列名替换为shard。
      meta_offset BIGINT       -- offset在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的offset列名替换为meta_offset。
    );
  3. 在Flink上创建一张表,用于将数据同步至AnalyticDB PostgreSQL版,建议表结构需与步骤1创建的源表结构相同。
    CREATE TABLE ADBPGTargetTable (
      `user_id` BIGINT primary key,
      `item_id` BIGINT,
      `behavior` STRING,
      `event_time` TIMESTAMP(3),
      `shard` BIGINT,            -- partition在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的partition列名替换为shard。
      `meta_offset` BIGINT       -- offset在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的offset列名替换为meta_offset。
    ) WITH (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'Password01',
       'tablename' = 'ADBPGTargetTable',
       'username' = 'user01',
       'url' = 'jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    AnalyticDB PostgreSQL Connector参数说明如下:

    参数是否必填说明
    connectorConnector名称,格式为adbpg-nightly-版本号

    例如本次示例使用的AnalyticDB PostgreSQL Connector是1.13版本,那么Connector名称为adbpg-nightly-1.13

    urlAnalyticDB PostgreSQL版的JDBC连接地址。格式为jdbc:postgresql://<连接地址:端口>/<连接的数据库名称>,示例如下jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres
    tablenameAnalyticDB PostgreSQL版的表名。
    usernameAnalyticDB PostgreSQL版的数据库账号。
    passwordAnalyticDB PostgreSQL版的数据库账号密码。
    maxretrytimesSQL执行失败后重试次数,默认值为3次。
    batchsize一次批量写入的最大条数,默认值为50000条。
    exceptionmode数据写入过程中出现异常时的处理策略。支持以下两种处理策略:
    • ignore(默认):数据写入异常时,忽略出现异常的数据。
    • strict:数据写入异常时,故障转移(Failover)并报错。
    conflictmode当出现主键冲突或者唯一索引冲突时的处理策略。支持以下四种处理策略:
    • ignore :忽略主键冲突,保留之前的数据。
    • strict:主键冲突时,故障转移(Failover)并报错。
    • update:主键冲突时,更新为新增的数据。
    • upsert(默认):主键冲突时,采用UPSERT方式写入数据。

      AnalyticDB PostgreSQL版通过INSERT ON CONFLICTCOPY ON CONFLICT实现UPSERT写入数据。

      如果目标表为分区表,则需要内核小版本为V6.3.6.1及以上。如何升级内核小版本,请参见版本升级

    targetschemaAnalyticDB PostgreSQL版的Schema,默认值为public。
    writemode写入方式。取值说明:
    • 0 :采用BATCH INSERT方式写入数据。
    • 1(默认):采用COPY API写入数据。
    • 2:采用BATCH UPSERT方式写入数据。
    verbose是否输出connector运行日志。取值说明:
    • 0(默认):不输出运行日志。
    • 1:输出运行日志。
    retrywaittime出现异常后,重试的间隔时间。单位为毫秒(ms),默认值为100。
    batchwritetimeoutms攒批写入数据时最长攒批时间,超过此时间会触发写入。单位为毫秒(ms),默认值为50000。
    connectionmaxactive连接池参数,单个Task manager中连接池最大连接数。默认值为5。
    casesensitive列名和表名是否区分大小写,取值说明:
    • 0(默认):不区分大小写。
    • 1:区分大小写。
  4. 在Flink上执行INSERT INTO命令将Kafka数据同步至AnalyticDB PostgreSQL版
    INSERT INTO ADBPGTargetTable SELECT * FROM KafkaSourceTable;

您可以在Flink控制台查看执行情况,示例如下。

Flink将kafka数据同步至adbpg