实时数仓Hologres
本文为您介绍如何使用实时数仓Hologres连接器。
背景信息
实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
|
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
源表
功能 | 详情 |
实时消费Hologres |
获取更多信息,详情请参见Flink/Blink实时消费Hologres Binlog。 |
结果表
功能 | 详情 |
支持写入Changelog消息。 | |
只更新修改部分的数据,而非整行更新。 | |
支持实时同步单表、整库的数据以及相应的表结构变更到Hologres表中。 | |
插入部分列 说明 仅实时计算引擎VVR 6.0.7及以上版本支持。 | 支持将Flink INSERT DML中指定的列名下推给连接器,从而仅更新指定的列。 |
前提条件
已创建Hologres表,详情请参见创建Hologres表。
使用限制
通用:
仅Flink计算引擎VVR 2.0.0及以上版本支持Hologres连接器。
Hologres连接器不支持访问Hologres外部表。关于Hologres外部表详情请参见基于HoloWeb创建MaxCompute外部表。
连接器目前的已知缺陷以及版本功能发布记录详见Hologres Connector Release Note。
源表独有:
Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。从VVR 3.0.0版本开始,支持实时消费Hologres数据,详情请参见实时计算Flink版实时消费Hologres。从VVR 6.0.3版本开始,Hologres源表在批模式读取时支持filter下推,详见源表参数
enable_filter_push_down
。实时计算引擎8.0以下版本Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?。
结果表独有:无。
维表独有:
维表建议使用主键作为Join条件,对于此类主键点查的维表,创建Hologres表时建议选择行存模式,列存模式对于点查场景性能开销较大。选择行存模式创建维表时必须设置主键,并且将主键设置为Clustering Key才可以工作。详情请参见CREATE TABLE。
如果业务需要,无法使用主键作为Join条件,对于此类非主键点查的维表(即一对多的查询),创建Hologres表时建议选择列存模式,并合理设置分布键Distribution Key以及Event Time Column(Segment Key)以优化查询性能,详情请参见表存储格式:列存、行存、行列共存。
VVR 4.0以下版本仅支持对维表主键点查的维表Join,VVR 4.0及以上版本,jdbc模式支持维表的非主键点查。
注意事项
使用了rpc模式时,VVR版本升级注意事项:
Hologres 2.0版本下线了rpc(用于维表与结果表)模式,全面转为jdbc相关模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重,或者升级到VVR 8.0.5版本配置deduplication.enabled参数为false。
如果您作业中存在使用了rpc模式读写Hologres的情况,此时如果您需要将VVR 4.x升级到VVR 6.x或VVR 8.x,请按照以下情况进行处理:
升级到VVR 6.0.4~6.0.6版本,可能会抛出异常。推荐维表和结果表使用jdbc_fixed或jdbc模式。
升级到VVR 6.0.7及以上版本,无需您做任何处理,Flink系统会自动替换rpc为jdbc相关模式。
使用binlog源表且未指定jdbc模式时,VVR版本升级注意事项:
Hologres 2.0版本开始有限支持holohub(用于Binlog源表)模式,Hologres 2.1版本彻底下线了holohub模式,全面转为jdbc模式。
如果您作业中存在消费binlog源表的情况,而且binlog源表未通过sdkmode='jdbc'指定jdbc模式,默认会使用holohub模式。此时如果您需要将VVR 4.x升级到VVR 6.x或VVR 8.x,请按照以下情况进行处理:
如果Hologres版本是2.0。
升级到VVR 6.0.7~VVR 8.0.3版本,仍然可以继续使用holohub模式。
升级到VVR 8.0.4及以上版本,可能抛出权限不足的异常,需要特别配置用户权限,详情见实时计算Flink版实时消费Hologres。
如果Hologres版本是2.1。
升级到VVR 6.0.7~VVR 8.0.4版本,可能无法正常消费Binlog,建议升级到VVR 8.0.5。
升级到VVR 8.0.5及以上版本,无需您做任何处理,Flink系统会自动替换holohub模式为jdbc模式。
语法结构
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'sdkmode' = 'jdbc'
);
WITH参数
类型映射
Flink与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射。
使用示例
源表示例
非Binlog Source表
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' --该参数可选。
'sdkmode' = 'jdbc'
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Binlog Source表
Hologres连接器支持实时消费Hologres,即实时消费Hologres的Binlog数据。Flink实时消费Hologres详情请参见实时计算Flink版实时消费Hologres。
结果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
维表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
特色功能详解
DataStream API
通过DataStream的方式读写数据时,需要使用对应的DataStream连接器连接实时计算Flink版,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器。VVR 6.0.7请使用其中的1.15-vvr-6.0.7-1版本的依赖。VVR 8.0.7的依赖请通过ververica-connector-hologres-1.17-vvr-8.0.7.jar下载,在本地调试时,需要使用相应的Uber JAR,详见本地运行和调试包含连接器的作业,VVR 8.0.7对应的Uber JAR为ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar。
Hologres源表
VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。
VVR 4.0.15
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// 构建HologresBulkreadInputFormat读取表数据。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7&VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
XML
Maven中央库中已经放置了Hologres DataStream连接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Hologres Binlog源表
VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。
VVR 4.0.15
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 设置或创建默认slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 构建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 8.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。
Hologres Binlog注意事项和实现原理等详情,请参见Binlog Source表。
Hologres结果表
VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。
VVR 4.0.15
// 初始化读取的表的Schema。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7&VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。
Flink与Hologres时区说明
时间类型
产品 | 类型 | 说明 |
Flink | 表示没有时区信息的日期和时间,描述年、 月、日、小时、分钟、秒和小数秒对应的时间戳。可以通过一个字符串来指定,例如 | |
用于描述时间线上的绝对时间点,使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数。epoch时间是从Java的标准epoch时间开始计算。在计算和可视化时, 每个TIMESTAMP_LTZ类型的数据都使用Session (会话)中配置的时区。可以用于跨时区的计算,因为它是一个基于epoch的绝对时间点,代表的就是不同时区的同一个绝对时间点。 相同的TIMESTAMP_LTZ值,在不同的时区可能会反映出不同的本地TIMESTAMP,例如:如果一个TIMESTAMP_LTZ值为 | ||
Hologres | TIMESTAMP | 类似于Flink的 |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | 类似于Flink的 例如北京(UTC+8)时区的时间戳 |
时间类型映射
实时计算引擎VVR 8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=false
时,支持所有时间类型间的相互转换。Flink
Hologres
详情
TIMESTAMP
TIMESTAMP
之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。
TIMESTAMP LTZ
TIMESTAMPTZ
TIMESTAMP
TIMESTAMPTZ
之间的转换涉及时区转换。为了在转换中保持准确性,需要通过配置项参数
table.local-time-zone
设置Flink时区,配置项参数设置方法请参见如何配置作业运行参数?。例如当设置
'table.local-time-zone': 'Asia/Shanghai'
时,表示Flink时区为上海(+8时区)时,Flink TIMESTAMP类型的数据为2022-01-01 01:01:01.123456,写入Hologres TIMESTAMP TZ的数值为2022-01-01 01:01:01.123456+8。TIMESTAMP LTZ
TIMESTAMP
实时计算引擎VVR8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=true
时或者VVR 8.0.5及以下版本,除TIMESTAMP间转化,其他类型相互转化可能会出现数据偏差问题。Flink
Hologres
备注
TIMESTAMP
TIMESTAMP
之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。
TIMESTAMP LTZ
TIMESTAMPTZ
读写Hologres数据时都当作无时区时间进行处理,可能会存在数据偏差。
例如,Flink TIMESTAMP_LTZ类型的数值为2024-03-19T04:00:00Z,在上海(+8时区)对应的实际无时区时间为2024-03-19T12:00:00,但是写入时将2024-03-19T04:00:00当作无时区时间,写入Hologres TIMESTAMPTZ的数值为2024-03-19T04:00:00+08,数值偏差8小时。
TIMESTAMP
TIMESTAMPTZ
时区转换默认采用的是运行环境的JVM时区,而不是Flink时区,这与Flink内部计算的时区转换格式不同。当Flink时区与机器的JVM时区不一致时,会导致数据存在偏差,建议采用Flink时区进行Hologres数据的读写。
TIMESTAMP LTZ
TIMESTAMP