通过实时计算Flink版写入数据到云原生数据仓库AnalyticDB PostgreSQL版
本文介绍如何通过阿里云实时计算Flink版写入数据到AnalyticDB PostgreSQL版。
使用限制
该功能暂不支持AnalyticDB PostgreSQL版Serverless模式。
仅Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。
仅Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版7.0版本。
说明如果您使用了自定义连接器,具体操作请参见管理自定义连接器。
前提条件
已创建Flink全托管工作空间。具体操作,请参见开通Flink全托管。
已创建AnalyticDB PostgreSQL版实例。具体操作,请参见创建实例。
AnalyticDB PostgreSQL版实例和Flink全托管工作空间需要位于同一VPC下。
配置AnalyticDB PostgreSQL版实例
- 登录云原生数据仓库AnalyticDB PostgreSQL版控制台。
将Flink工作空间所属的网段加入AnalyticDB PostgreSQL版的白名单。如何添加白名单,请参见设置白名单。
单击登录数据库,连接数据库的更多方式,请参见客户端连接。
在AnalyticDB PostgreSQL版实例上创建一张表。
建表SQL示例如下:
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
配置实时计算Flink
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击数据连接。
在数据连接页面,单击创建自定义连接器。
上传自定义连接器JAR文件。
说明获取AnalyticDB PostgreSQL版自定义Flink Connector的JAR包,请参见AnalyticDB PostgreSQL Connector。
JAR包的版本需要与实时计算平台的Flink引擎版本一致。
上传完成后,单击下一步。
系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步。如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。
单击完成。
创建完成的自定义连接器会出现在连接器列表中。
创建Flink作业
写入数据到AnalyticDB PostgreSQL版
编写作业代码。
创建随机源表
datagen_source
和对应AnalyticDB PostgreSQL版的表test_adbpg_table
,将以下作业代码拷贝到作业文本编辑区。CREATE TABLE datagen_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); CREATE TABLE test_adbpg_table ( `B1` bigint , `B2` bigint , `B3` VARCHAR , `B4` VARCHAR, PRIMARY KEY(B1) not ENFORCED ) with ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'xxx', 'tablename' = 'test_adbpg_table', 'username' = 'xxxx', 'url' = 'jdbc:postgresql://url:5432/schema', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
其中
datagen_source
表的参数无需修改,test_adbpg_table
表的参数需要根据您实际情况进行修改,参数说明如下:参数
是否必填
说明
connector
是
connector名称,固定为
adbpg-nightly-版本号
,例如adbpg-nightly-1.13
。url
是
AnalyticDB PostgreSQL版的JDBC连接地址。格式为
jdbc:postgresql://<内网地址>:<端口>/<连接的数据库名称>
,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的数据库账号。
password
是
AnalyticDB PostgreSQL版的数据库账号密码。
maxretrytimes
否
SQL执行失败后重试次数,默认为3次。
batchsize
否
一次批量写入的最大条数,默认为50000条。
exceptionmode
否
数据写入过程中出现异常时的处理策略。支持以下两种处理策略:
ignore(默认值):忽略出现异常时写入的数据。
strict:数据写入异常时,故障转移(Failover)并报错。
conflictmode
否
当出现主键冲突或者唯一索引冲突时的处理策略。支持以下四种处理策略:
ignore :忽略主键冲突,保留之前的数据。
strict:主键冲突时,故障转移(Failover)并报错。
update:主键冲突时,更新新到的数据。
upsert(默认值):主键冲突时,采用UPSERT方式写入数据。
AnalyticDB PostgreSQL版通过INSERT ON CONFLICT和COPY ON CONFLICT实现UPSERT写入数据,如果目标表为分区表,则需要内核小版本为V6.3.6.1及以上,如何升级内核小版本,请参见版本升级。
targetschema
否
AnalyticDB PostgreSQL版的Schema,默认为public。
writemode
否
写入方式。取值说明:
0 :采用BATCH INSERT方式写入数据。
1(默认值):采用COPY API写入数据。
2:采用BATCH UPSERT方式写入数据。
verbose
否
是否输出connector运行日志。取值说明:
0(默认):不输出运行日志。
1:输出运行日志。
retrywaittime
否
出现异常重试时间隔的时间。单位为ms,默认值为100。
batchwritetimeoutms
否
攒批写入数据时最长攒批时间,超过此事件会触发这一批的写入。单位为毫秒(ms),默认值为50000。
connectionmaxactive
否
连接池参数,单个Task manager中连接池同一时刻最大连接数。默认值为5。
casesensitive
否
列名和表名是否区分大小写,取值说明:
0(默认):不区分大小写。
1:区分大小写。
说明支持参数和类型映射,详情请参见连接器云原生数据仓库AnalyticDB PostgreSQL版。
启动作业。
在作业开发页面顶部,单击部署,在弹出的对话框中,单击确定。
说明Session集群适用于非生产环境的开发测试环境,您可以使用Session集群模式调试作业,提高作业JM(Job Manager)资源利用率和作业启动速度。但不推荐您将作业提交至Session集群中,因为会存在业务稳定性问题,详情请参见作业调试。
在作业运维页面,单击目标作业操作列的启动。
单击启动。
观察同步结果
连接AnalyticDB PostgreSQL版数据库。具体操作,请参见客户端连接。
执行以下语句查询
test_adbpg_table
表。SELECT * FROM test_adbpg_table;
数据正常写入到AnalyticDB PostgreSQL版中,返回示例如下。