本文为您介绍如何使用AnalyticDB PostgreSQL连接器。
背景信息
云原生数据仓库AnalyticDB PostgreSQL版是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。
AnalyticDB PostgreSQL版支持的信息如下。
类别 | 详情 |
支持类型 | 源表(公测中)、维表和结果表 说明 源表目前尚未内置,需通过自定义连接器的进行读取,具体的使用方法请参见Flink CDC实时订阅全量和增量数据。 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
已创建AnalyticDB PostgreSQL实例并创建表,详情请参见创建实例和CREATE TABLE。
已设置白名单,详情请参见设置白名单。
使用限制
仅Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL7.0版本。
暂不支持自建的PostgreSQL。
语法结构
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
'tableName'='yourDatabaseTableName',
'userName'='yourDatabaseUserName',
'password'='yourDatabasePassword'
);
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 |
|
url | JDBC连接地址。 | String | 是 | 无 | 格式为 |
tableName | 表名。 | String | 是 | 无 | 无。 |
userName | 用户名。 | String | 是 | 无 | 无。 |
password | 密码。 | String | 是 | 无 | 无。 |
maxRetryTimes | 写入数据失败后,重试写入的最大次数。 | Integer | 否 | 3 | 无。 |
targetSchema | Schema名称。 | String | 否 | public | 无。 |
caseSensitive | 大小写是否敏感。 | String | 否 | false | 参数取值如下:
|
connectionMaxActive | 连接池的最大连接数。 | Integer | 否 | 5 | 系统会自动释放与数据库服务的空闲连接。 重要 此参数设置过大可能会导致服务端连接数出现异常。 |
源表独有(公测中)
参数 | 说明 | 数据类型 | 是否必填 | 备注 |
schema-name | Schema名称。 | STRING | 是 | 该参数支持正则表达式,可以一次订阅多个Schema。 |
port | AnalyticDB for PostgreSQL端口。 | INTEGER | 是 | 固定值为5432。 |
decoding.plugin.name | Postgres Logical Decoding插件名称。 | STRING | 是 | 固定值为pgoutput。 |
slot.name | 逻辑解码槽的名字。 | STRING | 是 |
|
debezium.* | 细粒度地控制Debezium客户端的行为。 | STRING | 否 | 例如,设置 |
scan.incremental.snapshot.enabled | 是否开启增量快照。 | BOOLEAN | 否 | 取值如下:
|
scan.startup.mode | 消费数据时的启动模式。 | STRING | 否 | 取值如下:
|
changelog-mode | 用于编码流更改的变更日志(Changelog)模式。 | STRING | 否 | 取值如下:
|
heartbeat.interval.ms | 发送心跳包的时间间隔。 | DURATION | 否 | 默认值为30秒(单位:毫秒)。 AnalyticDB for PostgreSQLCDC连接器通过主动向数据库发送心跳包,确保Slot的偏移量能够持续推进。在表数据变更不频繁的情况下,合理设置该参数可以及时清理WAL日志,避免浪费磁盘空间。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作为快照阶段分片的切分列。 | STRING | 否 | 默认情况下会从主键中选择第一列。 |
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
retryWaitTime | 重试的时间间隔。 | Integer | 否 | 100 | 单位毫秒。 |
batchSize | 一次批量写入的数据条数。 | Integer | 否 | 500 | 无。 |
flushIntervalMs | 清空缓存的时间间隔。 | Integer | 否 | 无 | 如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。单位毫秒。 |
writeMode | 第一次尝试写入时的写入方式。 | String | 否 | insert | 参数取值如下:
|
conflictMode | 当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。 | String | 否 | strict | 参数取值如下:
|
维表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
maxJoinRows | 单行数据Join的最多行数。 | Integer | 否 | 1024 | 无。 |
cache | 缓存策略。 | String | 否 | ALL | 支持以下三种缓存策略:
|
cacheSize | 缓存大小,即缓存多少行数据。 | Long | 否 | 100000 | 仅当选择LRU缓存策略时,cacheSize参数生效。 |
cacheTTLMs | 缓存失效的超时时间。 | Long | 否 | Long.MAX_VALUE | cacheTTLMs配置和cache配置有关:
单位为毫秒。 |
类型映射
云原生数据仓库AnalyticDB PostgreSQL版字段类型 | Flink字段类型 |
boolean | boolean |
smallint | int |
int | int |
bigint | bigint |
float | double |
varchar | varchar |
text | varchar |
timestamp | timestamp |
date | date |
使用示例
源表(公测中)
使用示例详情请参见Flink CDC实时订阅全量和增量数据。
结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName', 'tableName'='yourDatabaseTableName', 'userName'='yourDatabaseUserName', 'password'='yourDatabasePassword' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;
维表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TEMPORARY TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName', 'tableName'='yourDatabaseTableName', 'userName'='yourDatabaseUserName', 'password'='yourDatabasePassword' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;