本文介绍如何通过社区版Flink将Kafka的数据同步至AnalyticDB PostgreSQL版。
前提条件
- 已将社区版Flink的客户端IP地址添加至AnalyticDB PostgreSQL版白名单中,设置白名单的方法,请参见设置白名单。
- 在Flink的客户端的$FLINK_HOME/lib路径上部署Kafka Connector相关依赖,您可以直接使用Flink官网提供的Table API Kafka Connector,具体内容,请参见Apache Kafka SQL Connector。本文示例中使用的依赖如下。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17-SNAPSHOT</version> </dependency>
- 在Flink的客户端的$FLINK_HOME/lib路径上部署AnalyticDB PostgreSQL Connector相关依赖。获取AnalyticDB PostgreSQL Connector的JAR包,请参见AnalyticDB PostgreSQL Connector。
本文示例中使用的AnalyticDB PostgreSQL Connector版本为1.13。实际使用时建议选择Flink引擎版本相近的版本。
注意事项
阿里云实时计算Flink版与社区版Flink操作存在差异,但是使用过程基本相同,如需使用阿里云实时计算Flink版,请参见阿里云实时计算Flink版文档。
操作步骤
- 在Flink上创建一张表,用于读取Kafka表的数据。
CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, `shard` BIGINT METADATA FROM 'partition' VIRTUAL, `meta_offset` BIGINT METADATA FROM 'offset' VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' );
Kafka Connector参数说明如下:
参数 是否必填 说明 connector 是 Connector名称,此处固定为 kafka
。topic 是 Kafka的Topic名称。 properties.bootstrap.servers 是 Kafka客户端的连接地址和端口。 properties.group.id 是 Kafka的消费组ID。 scan.startup.mode 否 数据消费的起始点位,具体介绍,请参见起始消费点位。 value.format 是 序列化和反序列化Kafka消息体时使用的格式。更多格式和相关配置的介绍,请参见格式。 更多Kafka Connector参数介绍,请参见连接器参数。
Kafka的每条消息(Record)中均包含了一些元数据(例如timestamp、offset、partition等),这些元数据在业务中可以起到一定作用。测试表中的event_time、meta_offset和shard列就是从Kafka消息中获取到的有用信息。关于可用的元数据的介绍,请参见可用的元数据。
- 在AnalyticDB PostgreSQL版上创建一张目标表。
CREATE TABLE ADBPGTargetTable ( user_id BIGINT primary key, item_id BIGINT, behavior VARCHAR, event_time TIMESTAMP, shard BIGINT, -- partition在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的partition列名替换为shard。 meta_offset BIGINT -- offset在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的offset列名替换为meta_offset。 );
- 在Flink上创建一张表,用于将数据同步至AnalyticDB PostgreSQL版,建议表结构需与步骤1创建的源表结构相同。
CREATE TABLE ADBPGTargetTable ( `user_id` BIGINT primary key, `item_id` BIGINT, `behavior` STRING, `event_time` TIMESTAMP(3), `shard` BIGINT, -- partition在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的partition列名替换为shard。 `meta_offset` BIGINT -- offset在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的offset列名替换为meta_offset。 ) WITH ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'Password01', 'tablename' = 'ADBPGTargetTable', 'username' = 'user01', 'url' = 'jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
AnalyticDB PostgreSQL Connector参数说明如下:
参数 是否必填 说明 connector 是 Connector名称,格式为 adbpg-nightly-版本号
。例如本次示例使用的AnalyticDB PostgreSQL Connector是1.13版本,那么Connector名称为
adbpg-nightly-1.13
。url 是 AnalyticDB PostgreSQL版的JDBC连接地址。格式为 jdbc:postgresql://<连接地址:端口>/<连接的数据库名称>
,示例如下jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres
。tablename 是 AnalyticDB PostgreSQL版的表名。 username 是 AnalyticDB PostgreSQL版的数据库账号。 password 是 AnalyticDB PostgreSQL版的数据库账号密码。 maxretrytimes 否 SQL执行失败后重试次数,默认值为3次。 batchsize 否 一次批量写入的最大条数,默认值为50000条。 exceptionmode 否 数据写入过程中出现异常时的处理策略。支持以下两种处理策略: - ignore(默认):数据写入异常时,忽略出现异常的数据。
- strict:数据写入异常时,故障转移(Failover)并报错。
conflictmode 否 当出现主键冲突或者唯一索引冲突时的处理策略。支持以下四种处理策略: - ignore :忽略主键冲突,保留之前的数据。
- strict:主键冲突时,故障转移(Failover)并报错。
- update:主键冲突时,更新为新增的数据。
- upsert(默认):主键冲突时,采用UPSERT方式写入数据。
AnalyticDB PostgreSQL版通过INSERT ON CONFLICT和COPY ON CONFLICT实现UPSERT写入数据。
如果目标表为分区表,则需要内核小版本为V6.3.6.1及以上。如何升级内核小版本,请参见版本升级。
targetschema 否 AnalyticDB PostgreSQL版的Schema,默认值为public。 writemode 否 写入方式。取值说明: - 0 :采用BATCH INSERT方式写入数据。
- 1(默认):采用COPY API写入数据。
- 2:采用BATCH UPSERT方式写入数据。
verbose 否 是否输出connector运行日志。取值说明: - 0(默认):不输出运行日志。
- 1:输出运行日志。
retrywaittime 否 出现异常后,重试的间隔时间。单位为毫秒(ms),默认值为100。 batchwritetimeoutms 否 攒批写入数据时最长攒批时间,超过此时间会触发写入。单位为毫秒(ms),默认值为50000。 connectionmaxactive 否 连接池参数,单个Task manager中连接池最大连接数。默认值为5。 casesensitive 否 列名和表名是否区分大小写,取值说明: - 0(默认):不区分大小写。
- 1:区分大小写。
- 在Flink上执行INSERT INTO命令将Kafka数据同步至AnalyticDB PostgreSQL版。
INSERT INTO ADBPGTargetTable SELECT * FROM KafkaSourceTable;
您可以在Flink控制台查看执行情况,示例如下。