本文为您介绍如何使用AnalyticDB PostgreSQL连接器。
背景信息
云原生数据仓库AnalyticDB PostgreSQL版是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。
AnalyticDB PostgreSQL版支持的信息如下。
类别 | 详情 |
支持类型 | 维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
已创建AnalyticDB PostgreSQL实例并创建表,详情请参见创建实例和CREATE TABLE。
已设置白名单,详情请参见设置白名单。
使用限制
仅Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。
仅Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版7.0版本。
暂不支持自建的PostgreSQL。
语法结构
CREATE TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
WITH参数
类型 | 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
通用 | connector | 表类型。 | String | 是 | 无 | 固定值为adbpg。 |
url | JDBC连接地址。 | String | 是 | 无 | 格式为 | |
tableName | 表名。 | String | 是 | 无 | 无。 | |
userName | 用户名。 | String | 是 | 无 | 无。 | |
password | 密码。 | String | 是 | 无 | 无。 | |
maxRetryTimes | 写入数据失败后,重试写入的最大次数。 | Integer | 否 | 3 | 无。 | |
targetSchema | Schema名称。 | String | 否 | public | 无。 | |
caseSensitive | 大小写是否敏感。 | String | 否 | false | 参数取值如下:
| |
connectionMaxActive | 连接池的最大连接数。 | Integer | 否 | 5 | 系统会自动释放与数据库服务的空闲连接。 重要 此参数设置过大可能会导致服务端连接数出现异常。 | |
结果表独有 | retryWaitTime | 重试的时间间隔。 | Integer | 否 | 100 | 单位毫秒。 |
batchSize | 一次批量写入的数据条数。 | Integer | 否 | 500 | 无。 | |
flushIntervalMs | 清空缓存的时间间隔。 | Integer | 否 | 无 | 如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。单位毫秒。 | |
writeMode | 第一次尝试写入时的写入方式。 | String | 否 | insert | 参数取值如下:
| |
conflictMode | 当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。 | String | 否 | strict | 参数取值如下:
| |
维表独有 | maxJoinRows | 单行数据Join的最多行数。 | Integer | 否 | 1024 | 无。 |
cache | 缓存策略。 | String | 否 | ALL | 支持以下三种缓存策略:
| |
cacheSize | 缓存大小,即缓存多少行数据。 | Long | 否 | 100000 | 仅当选择LRU缓存策略时,cacheSize参数生效。 | |
cacheTTLMs | 缓存失效的超时时间。 | Long | 否 | Long.MAX_VALUE | cacheTTLMs配置和cache配置有关:
单位为毫秒。 |
类型映射
云原生数据仓库AnalyticDB PostgreSQL版字段类型 | Flink字段类型 |
boolean | boolean |
smallint | int |
int | int |
bigint | bigint |
float | double |
varchar | varchar |
text | varchar |
timestamp | timestamp |
date | date |
使用示例
结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;
维表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;