首页 实时计算 Flink版 开发参考 连接器 云原生数据仓库AnalyticDB PostgreSQL版(ADB PG)

云原生数据仓库AnalyticDB PostgreSQL版(ADB PG)

更新时间: 2024-06-05 17:57:11

本文为您介绍如何使用AnalyticDB PostgreSQL连接器。

背景信息

云原生数据仓库AnalyticDB PostgreSQL版是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。

AnalyticDB PostgreSQL版支持的信息如下。

类别

详情

支持类型

维表和结果表

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

  • 结果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • 维表:无

说明

指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。

  • 仅Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版7.0版本。

  • 暂不支持自建的PostgreSQL。

语法结构

CREATE TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

WITH参数

类型

参数

说明

数据类型

是否必填

默认值

备注

通用

connector

表类型。

String

固定值为adbpg。

url

JDBC连接地址。

String

格式为jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

无。

userName

用户名。

String

无。

password

密码。

String

无。

maxRetryTimes

写入数据失败后,重试写入的最大次数。

Integer

3

无。

targetSchema

Schema名称。

String

public

无。

caseSensitive

大小写是否敏感。

String

false

参数取值如下:

  • true:大小写敏感。

  • false(默认值):大小写不敏感。

connectionMaxActive

连接池的最大连接数。

Integer

5

系统会自动释放与数据库服务的空闲连接。

重要

此参数设置过大可能会导致服务端连接数出现异常。

结果表独有

retryWaitTime

重试的时间间隔。

Integer

100

单位毫秒。

batchSize

一次批量写入的数据条数。

Integer

500

无。

flushIntervalMs

清空缓存的时间间隔。

Integer

如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。单位毫秒。

writeMode

第一次尝试写入时的写入方式。

String

insert

参数取值如下:

  • insert(默认值):直接插入,冲突时参考conflictMode

  • upsert:冲突时自动update,只能用于有主键的表。

conflictMode

当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。

String

strict

参数取值如下:

  • strict(默认值):冲突时报错。

  • ignore:冲突时忽略。

  • update:冲突时自动更新,可用于无主键表,执行效率较低。

  • upsert:冲突时自动更新,只能用于有主键表。

维表独有

maxJoinRows

单行数据Join的最多行数。

Integer

1024

无。

cache

缓存策略。

String

ALL

支持以下三种缓存策略:

  • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。

  • None:无缓存。

cacheSize

缓存大小,即缓存多少行数据。

Long

100000

仅当选择LRU缓存策略时,cacheSize参数生效。

cacheTTLMs

缓存失效的超时时间。

Long

Long.MAX_VALUE

cacheTTLMs配置和cache配置有关:

  • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认不过期。

  • 如果cache配置为ALL,则cacheTTLMs为设置缓存重新加载的间隔时间,默认不重新加载。

单位为毫秒。

类型映射

云原生数据仓库AnalyticDB PostgreSQL版字段类型

Flink字段类型

boolean

boolean

smallint

int

int

int

bigint

bigint

float

double

varchar

varchar

text

varchar

timestamp

timestamp

date

date

使用示例

  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • 维表

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相关文档

上一篇: 配置Postgres 下一篇: 云原生多模数据库Lindorm
阿里云首页 实时计算 Flink版 相关技术圈