Flink全托管产品(Flink Serverless)是基于Apache Flink构建的全托管产品,为您提供全托管的实时计算服务。Hologres与Flink全托管高度兼容,支持Hologres源表、维表和结果表,以及Catalog等功能,满足一站式实时数仓建设。本文为您介绍如何使用Hologres作为Flink全托管的上下游存储。
Hologres源表、维表和结果表
在Flink全托管中支持Hologres源表、维表和结果表,详细的使用说明请参见实时数仓Hologres。
Flink实时消费Binlog
Hologres Connector支持实时消费Binlog,从VVR 6.0.3版本开始支持JDBC模式的流式源表,相比Holohub模式,支持更多的数据类型以及支持自定义账号,详情请参见Flink实时消费Binlog。
Hologres Catalog
Flink全托管支持Hologres Catalog,在Flink全托管控制台直接读取Hologres元数据,不用再手动注册Hologres表,可以提高作业开发的效率且能保证表结构的正确性,详情请参见管理Hologres Catalog。
基于Hologres Catalog,目前全托管的Flink已经支持模型演进(schema evolution)以及整库同步能力,详情请参见CREATE TABLE AS(CTAS)语句和CREATE DATABASE AS(CDAS)语句。
Hologres DataStream Connector
如果您通过DataStream的方式读写Hologres数据,则需要使用Hologres DataStream Connector连接Flink全托管,详情请参见Hologres DataStream Connector。
数据类型映射
Flink全托管与Hologres的数据类型映射,请参见数据类型汇总。
不论是将Hologres作为源表、维表还是结果表,在Flink的SQL作业中,都要将数据定义为Flink的数据类型。只有在Hologres中创建内部表时,才需要定义为Hologres的数据类型。
如果期望使用Flink将JSON数据写入Hologres,则在Flink SQL作业中,需要将源表、结果表中的该列都定义为VARCHAR类型,仅在Hologres内部表中将其定义为JSONB类型,示例如下:
Hologres创建内部表:message定义为JSONB类型。
BEGIN ; DROP TABLE IF EXISTS holo_internal_table; CREATE TABLE IF NOT EXISTS holo_internal_table ( id BIGINT NOT NULL, message JSONB NOT NULL ); CALL set_table_property('holo_internal_table', 'distribution_key', 'id'); COMMIT ;
Flink作业:源表、结果表的message都定义为VARCHAR类型。
CREATE TEMPORARY TABLE randomSource ( id BIGINT, message VARCHAR ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE sink_holo ( id BIGINT, message VARCHAR ) WITH ( 'connector' = 'hologres', 'endpoint' = '', 'username' = '', 'password' = '', 'dbname' = '', 'tablename' = 'holo_internal_table' ); INSERT INTO sink_holo SELECT 1, '{"k":"v"}' FROM randomSource;
Connector Release Note
版本说明请参见Hologres Connector Release Note。
常见问题
Flink全托管使用过程中,常见问题及解决方案请参见Blink和Flink常见问题及诊断。