云原生数据仓库AnalyticDB PostgreSQL版(ADB PG)

本文为您介绍如何使用AnalyticDB PostgreSQL连接器。

背景信息

云原生数据仓库AnalyticDB PostgreSQL是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。

AnalyticDB PostgreSQL版支持的信息如下。

类别

详情

支持类型

源表(公测中)、维表和结果表

说明

源表目前尚未内置,需通过自定义连接器的进行读取,具体的使用方法请参见Flink CDC实时订阅全量和增量数据

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

  • 结果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • 维表:无

说明

指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL7.0版本。

  • 暂不支持自建的PostgreSQL。

语法结构

CREATE TEMPORARY TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
 'tableName'='yourDatabaseTableName',
 'userName'='yourDatabaseUserName',
 'password'='yourDatabasePassword'
);

WITH参数

通用

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

String

  • 源表固定值为adbpg-cdc。

  • 结果表和维表固定值为adbpg。

url

JDBC连接地址。

String

格式为jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

无。

userName

用户名。

String

无。

password

密码。

String

无。

maxRetryTimes

写入数据失败后,重试写入的最大次数。

Integer

3

无。

targetSchema

Schema名称。

String

public

无。

caseSensitive

大小写是否敏感。

String

false

参数取值如下:

  • true:大小写敏感。

  • false(默认值):大小写不敏感。

connectionMaxActive

连接池的最大连接数。

Integer

5

系统会自动释放与数据库服务的空闲连接。

重要

此参数设置过大可能会导致服务端连接数出现异常。

源表独有(公测中)

参数

说明

数据类型

是否必填

备注

schema-name

Schema名称。

STRING

该参数支持正则表达式,可以一次订阅多个Schema。

port

AnalyticDB for PostgreSQL端口。

INTEGER

固定值为5432。

decoding.plugin.name

Postgres Logical Decoding插件名称。

STRING

固定值为pgoutput。

slot.name

逻辑解码槽的名字。

STRING

  • 对于同一个Flink作业涉及的源表,建议使用相同的slot.name

  • 如果不同的Flink作业涉及同一张表,则建议为每个作业分别设置独立的slot.name参数,以避免出现以下错误:PSQLException: ERROR: replication slot "debezium" is active for PID 974

debezium.*

细粒度地控制Debezium客户端的行为。

STRING

例如,设置 'debezium.snapshot.mode' = 'never' 可以禁用快照功能。您可以通过配置属性获取更多配置详情。

scan.incremental.snapshot.enabled

是否开启增量快照。

BOOLEAN

取值如下:

  • false(默认值):不开启增量快照。

  • true:开启增量快照。

scan.startup.mode

消费数据时的启动模式。

STRING

取值如下:

  • initial(默认值):在首次启动时,先扫描历史全量数据,然后读取最新的WAL日志数据,实现全量与增量数据的无缝衔接。

  • latest-offset:在首次启动时,不扫描历史全量数据,直接从 WAL 日志的末尾(即最新的日志位置)开始读取,仅捕获连接器启动后的最新变更数据。

  • snapshot:先扫描历史全量数据,同时读取全量阶段新产生的 WAL 日志,最终作业会在完成全量扫描后停止运行。

changelog-mode

用于编码流更改的变更日志(Changelog)模式。

STRING

取值如下:

  • ALL(默认值):支持所有操作类型,包括 INSERTDELETEUPDATE_BEFORE 和 UPDATE_AFTER

  • UPSERT:仅支持 UPSERT 类型的操作,包括 INSERTDELETE 和 UPDATE_AFTER

heartbeat.interval.ms

发送心跳包的时间间隔。

DURATION

默认值为30秒(单位:毫秒)。

AnalyticDB for PostgreSQLCDC连接器通过主动向数据库发送心跳包,确保Slot的偏移量能够持续推进。在表数据变更不频繁的情况下,合理设置该参数可以及时清理WAL日志,避免浪费磁盘空间。

scan.incremental.snapshot.chunk.key-column

指定某一列作为快照阶段分片的切分列。

STRING

默认情况下会从主键中选择第一列。

结果表独有

参数

说明

数据类型

是否必填

默认值

备注

retryWaitTime

重试的时间间隔。

Integer

100

单位毫秒。

batchSize

一次批量写入的数据条数。

Integer

500

无。

flushIntervalMs

清空缓存的时间间隔。

Integer

如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。单位毫秒。

writeMode

第一次尝试写入时的写入方式。

String

insert

参数取值如下:

  • insert(默认值):直接插入,冲突时参考conflictMode

  • upsert:冲突时自动update,只能用于有主键的表。

  • copy:使用COPY语句进行写入。

    说明

    仅实时计算引擎VVR 11.1及以上版本支持copy模式。

conflictMode

Insert写入出现主键冲突或者唯一索引冲突时的处理策略。

String

strict

参数取值如下:

  • strict(默认值):冲突时报错。

  • ignore:冲突时忽略。

  • update:冲突时自动更新,可用于无主键表,执行效率较低。

  • upsert:冲突时自动更新,只能用于有主键表。

维表独有

参数

说明

数据类型

是否必填

默认值

备注

maxJoinRows

单行数据Join的最多行数。

Integer

1024

无。

cache

缓存策略。

String

ALL

支持以下三种缓存策略:

  • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。

  • None:无缓存。

cacheSize

缓存大小,即缓存多少行数据。

Long

100000

仅当选择LRU缓存策略时,cacheSize参数生效。

cacheTTLMs

缓存失效的超时时间。

Long

Long.MAX_VALUE

cacheTTLMs配置和cache配置有关:

  • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认不过期。

  • 如果cache配置为ALL,则cacheTTLMs为设置缓存重新加载的间隔时间,默认不重新加载。

单位为毫秒。

类型映射

云原生数据仓库AnalyticDB PostgreSQL版字段类型

Flink字段类型

boolean

boolean

smallint

int

int

int

bigint

bigint

float

double

varchar

varchar

text

varchar

timestamp

timestamp

date

date

使用示例

  • 源表(公测中)

    使用示例详情请参见Flink CDC实时订阅全量和增量数据

  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
     'tableName'='yourDatabaseTableName',
     'userName'='yourDatabaseUserName',
     'password'='yourDatabasePassword'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • 维表

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TEMPORARY TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
     'tableName'='yourDatabaseTableName',
     'userName'='yourDatabaseUserName',
     'password'='yourDatabasePassword'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相关文档