实时数仓Hologres
本文为您介绍如何使用实时数仓Hologres连接器。
背景信息
实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
|
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
源表
功能 | 详情 |
实时消费Hologres |
获取更多信息,详情请参见Flink/Blink实时消费Hologres Binlog。 |
结果表
功能 | 详情 |
支持写入Changelog消息。 | |
只更新修改部分的数据,而非整行更新。 | |
支持实时同步单表、整库的数据以及相应的表结构变更到Hologres表中。 | |
插入部分列 说明 仅实时计算引擎VVR 6.0.7及以上版本支持。 | 支持将Flink INSERT DML中指定的列名下推给连接器,从而仅更新指定的列。 |
前提条件
已创建Hologres表,详情请参见创建Hologres表。
使用限制
通用:
仅Flink计算引擎VVR 2.0.0及以上版本支持Hologres连接器。
Hologres连接器不支持访问Hologres外部表。关于Hologres外部表详情请参见外部表。
连接器目前的已知缺陷以及版本功能发布记录详见Hologres Connector Release Note。
源表独有:
Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。从VVR 3.0.0版本开始,支持实时消费Hologres数据,详情请参见实时计算Flink版实时消费Hologres。从VVR 6.0.3版本开始,Hologres源表在批模式读取时支持filter下推,详见源表独有参数
enable_filter_push_down
。Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?。
结果表独有:无。
维表独有:
创建Hologres维表时建议选择行存模式,列存模式对于点查场景性能开销较大。选择行存模式创建维表时必须设置主键,并且将主键设置为Clustering Key才可以工作。详情请参见建表概述。
VVR 4.0以下版本仅支持对维表主键点查的维表Join。
注意事项
Hologres 2.0版本下线了rpc(用于维表与结果表)和holohub(用于Binlog源表)模式,全面转为jdbc相关模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重。
如果您需要将VVR 4.x升级到VVR 6.x,而且读写了Hologres 2.0版本并使用了上述已经下线的两种模式,请按照以下情况进行处理:
升级到VVR 6.0.4~6.0.6版本,原有作业包含了上述情况,则可能会抛出异常。推荐维表和结果表使用jdbc_fixed或jdbc模式,源表使用jdbc模式。
升级到VVR 6.0.7版本及以上版本,原有作业包含了上述情况,无需您做任何处理,Flink系统会自动替换不支持的模式为jdbc相关模式。
语法结构
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>'
);
WITH参数
仅Flink实时计算引擎VVR 4.0.11及以上版本支持所有jdbc开头的参数。
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
hologres
。dbname
数据库名称。
String
是
无
无。
tablename
表名称。
String
是
无
如果Schema不为Public时,则tablename需要填写为
schema.tableName
。username
用户名,请填写阿里云账号的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理。
password
密码,请填写阿里云账号的AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理。
endpoint
Hologres服务地址。
String
是
无
详情请参见访问域名。
jdbcRetryCount
当连接故障时,写入和查询的重试次数。
Integer
否
10
无。
jdbcRetrySleepInitMs
每次重试的固定等待时间。
Long
否
1000
实际重试的等待时间的计算公式为
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。单位为毫秒。jdbcRetrySleepStepMs
每次重试的累加等待时间。
Long
否
5000
实际重试的等待时间的计算公式为
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。单位为毫秒。jdbcConnectionMaxIdleMs
JDBC连接的空闲时间。
Long
否
60000
超过这个空闲时间,连接就会断开释放掉。单位为毫秒。
jdbcMetaCacheTTL
本地缓存TableSchema信息的过期时间。
Long
否
60000
单位为毫秒。
jdbcMetaAutoRefreshFactor
如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。
Integer
否
4
缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。
触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
field_delimiter
导出数据时,不同行之间使用的分隔符。
String
否
"\u0002"
无。
binlog
是否消费Binlog数据。
Boolean
否
false
参数取值如下:
true:消费Binlog数据。
false(默认值):不消费Binlog数据。
sdkMode
SDK模式。
String
否
holohub
参数取值如下:
holohub(默认值):使用holohub模式的binlog源表。
jdbc:使用JDBC模式的binlog源表。
详情请参见Binlog Source表。
说明VVR 6.0.3以下版本:不支持配置该参数。
VVR 6.0.4~6.0.6版本:推荐使用默认配置,即holohub。
VVR 6.0.7以上版本:推荐配置为jdbc。
Hologres实例为2.0以下版本,Flink系统采用您配置的参数值。
Hologres实例为2.0及以上版本,由于Hologres 2.0版本下线了holohub服务,此时如果您配置了holohub,Flink系统自动切换为jdbc。但是如果您配置为jdbc,Flink系统采用jdbc。
jdbcBinlogSlotName
JDBC模式的binlog源表的Slot名称。创建方法请参见JDBC模式Binlog源表。
String
否
无
仅在sdkMode配置为jdbc时生效。
binlogMaxRetryTimes
读取Binlog数据出错后的重试次数。
Integer
否
60
无。
binlogRetryIntervalMs
读取Binlog数据出错后的重试时间间隔。
Long
否
2000
单位为毫秒。
binlogBatchReadSize
批量读取Binlog的数据行数。
Integer
否
100
无。
cdcMode
是否采用CDC模式读取Binlog数据。
Boolean
否
false
参数取值如下:
true:CDC模式读取Binlog数据。
false(默认值):非CDC模式读取Binlog数据。
upsertSource
源表是否使用upsert类型的Changelog。
Boolean
否
false
仅在CDC模式下生效。参数取值如下:
true:仅支持Upsert类型,包括INSERT、DELETE、和UPDATE_AFTER。
false(默认值):支持所有类型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。
说明如果下游包含回撤算子(例如使用ROW_NUMBER OVER WINDOW去重),则需要设置upsertSource为true,此时源表会以Upsert方式从Hologres中读取数据。
binlogStartupMode
Binlog数据消费模式。
String
否
earliestOffset
参数取值如下:
initial:先全量消费数据,再读取Binlog开始增量消费。
earliestOffset(默认值):从最早的Binlog开始消费。
timestamp:从设置的startTime开始消费Binlog。
说明如果设置了startTime或者在启动界面选择了启动时间,则binlogStartupMode强制使用timestamp模式,其他消费模式不生效,即startTime参数优先级更高。
说明仅实时计算引擎VVR 4.0.13及以上版本,Hologres 0.10及以上版本支持该参数。
startTime
启动位点的时间。
String
否
无
格式为yyyy-MM-dd hh:mm:ss。如果没有设置该参数,且作业没有从State恢复,则从最早的Binlog开始消费Hologres数据。
jdbcScanFetchSize
扫描时攒批大小。
Integer
否
256
无。
jdbcScanTimeoutSeconds
扫描操作超时时间。
Integer
否
60
单位为秒。
jdbcScanTransactionSessionTimeoutSeconds
扫描操作所在事务的超时时间。
Integer
否
600秒(0表示不超时)
对应Hologres的GUC参数idle_in_transaction_session_timeout,详情请参见GUC参数。
说明仅实时计算引擎Flink1.13-vvr-4.0.15及以上版本支持该参数。
enable_filter_push_down
全量读取阶段是否进行filter下推。
Boolean
否
false
参数取值如下:
false(默认值):不进行filter下推。
true:读取全量数据时,将支持的过滤条件下推到Hologres执行。包括非Binlog Source全量读取以及Binlog Source使用全增量一体化消费模式时的全量阶段。
重要实时计算引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5默认会进行filter下推,但如果作业使用了hologres维表,且写入的DML中包含有对维表非主键字段的过滤条件时,维表的filter会被错误的下推,可能导致维表join出现错误结果。因此推荐使用实时计算引擎Flink1.15-vvr-6.0.6及以上版本,并在源表增加此参数来开启过滤条件下推功能。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sdkMode
SDK模式。
String
否
jdbc
参数取值如下:
jdbc:默认值,表示使用jdbc模式进行写入。
jdbc_copy:是否使用fixed copy方式写入。
fixed copy是hologres1.3新增的能力,相比通过insert方法(jdbc模式)进行写入,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批)。但此模式暂不支持delete数据,也不支持写入分区父表。
rpc:表示使用rpc模式进行写入,与useRpcMode参数一致,与jdbc模式的区别在于不占用连接数,不支持写入Hologres的Jsonb,RoarinBitmap类型。
jdbc_fixed(beta功能):表示使用fixed jdbc方式进行写入,
需要hologres引擎版本大于等于1.3,与jdbc模式的区别在于不占用连接数,不支持写入Hologres的Jsonb,RoarinBitmap类型。
说明VVR 6.0.3以下版本:不支持配置该参数。
VVR 6.0.4~VVR 6.0.6版本:推荐配置为jdbc。
VVR 6.0.7及以上版本:推荐配置为jdbc。
如果Hologres实例为2.0以下版本,Flink系统采用您配置的参数值。
如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为rpc,Flink系统将自动切换该参数值为jdbc_fixed,但是如果您配置为其他值,Flink系统将采用您配置的参数值。
rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重。
field_delimiter
Hologres Sink支持将一个STRING字段按照field_delimiter切分成数组导入Hologres。
String
否
"\u0002"
无。
useRpcMode
是否通过RPC方式使用Hologres连接器。
Boolean
否
false
参数取值如下:
true:使用RPC方式使用Hologres连接器。
与sdkMode参数设置为rpc效果相同,通过RPC方式会降低SQL连接数。
false(默认值):使用JDBC方式使用Hologres连接器。
通过JDBC方式会占用SQL连接,导致JDBC链接数增加。
说明Hologres 2.0版本下线了rpc模式,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。
实时计算引擎VVR 6.0.7及以上版本在检测到Hologres实例是2.0及以上版本时,会自动切换rpc模式为jdbc_fixed模式。
rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重。
mutatetype
数据写入模式。
String
否
insertorignore
详情请参见流式语义。
partitionrouter
是否写入分区表。
Boolean
否
false
无。
createparttable
当写入分区表时,是否根据分区值自动创建不存在的分区表。
Boolean
否
false
如果分区值中存在短划线(-),暂不支持自动创建分区表。
说明请确保分区值不会出现脏数据,否则会创建错误的分区表导致Failover,建议慎用该参数。
当sdk_mode设置为jdbc_copy时,不支持写入分区父表。
Hologres使用Date类型做分区键时,不支持自动创建分区表。
ignoredelete
是否忽略撤回消息。
Boolean
否
true
说明仅在使用流式语义时生效。
connectionSize
单个Flink结果表任务所创建的JDBC连接池大小。
Integer
否
3
如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。
jdbcWriteBatchSize
JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。
Integer
否
256
单位为数据行数。
说明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。
jdbcWriteBatchByteSize
JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。
Long
否
2097152(2*1024*1024)字节,即2 MB
说明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。
jdbcWriteFlushInterval
JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。
Long
否
10000
单位为毫秒。
说明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。
ignoreNullWhenUpdate
当mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。
Boolean
否
false
取值说明如下:
false(默认值):将Null值写到Hologres结果表里。
true:忽略更新写入数据中的Null值。
说明仅Flink计算引擎VVR 4.0及以上版本支持该参数。
connectionPoolName
连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。
String
否
无
每个表默认使用自己的连接池。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectorSize都需要相同。
说明仅实时计算VVR 4.0.12及以上版本支持该参数。
jdbcEnableDefaultForNotNullColumn
Hologres表中Not Null且没有设置默认值的字段,是否允许写入Null值。
Boolean
否
true
参数取值如下:
true(默认值):允许写入Null值。
如果将Null值写入Hologres表中Not Null且无默认值的字段,则按照下列类型写入默认值:
如果字段是String类型,则默认写为空(“”)。
如果字段是Number类型,则默认写为0。
如果字段是以下任意时间类型:
Date
timestamp
timestamptz
则默认写为1970-01-01 00:00:00。
false:不允许写入Null值。
partial-insert.enabled
是否只插入INSERT语句中定义的字段。
Boolean
否
false
参数取值如下:
false(默认值):无论INSERT语句中声明了哪些字段,都会更新结果表DDL中定义的所有字段,对于未在INSERT语句中声明的字段,会被更新为null。
true:将INSERT语句中定义的字段下推给连接器,从而可以只对声明的字段进行更新或插入。
说明仅实时计算引擎VVR 6.0.7及以上版本支持该参数。
此参数仅在mutatetype参数配置为InsertOrUpdate时生效。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
sdkMode
SDK模式。
String
否
jdbc
参数取值如下:
jdbc(默认值):表示使用jdbc模式进行查询,支持主键点查和非主键的查询,但是非主键的查询对性能影响较大,查询较慢。
rpc:表示使用rpc模式进行点查,与useRpcMode参数一致,仅支持主键点查,即维表的主键字段必须与Flink Join On的字段完全匹配,与jdbc模式的区别在于不占用连接数,不支持读取Hologres的Jsonb,RoarinBitmap类型。
jdbc_fixed:(beta功能,需要hologres引擎版本大于等于1.3)表示使用fixed jdbc方式进行点查,与jdbc模式的区别在于不占用连接数,且不支持读取Hologres的Jsonb,RoarinBitmap类型。仅支持主键点查,即维表的主键字段必须与Flink Join On的字段完全匹配。
说明VVR 6.0.3以下版本:不支持配置该参数。
VVR 6.0.4~VVR 6.0.6版本:推荐配置为jdbc。
在VVR 6.0.6版本,SDK模式选择为jdbc时,如果维表字符串类型的查询结果中包含null值,可能抛出空指针异常,此时推荐您使用rpc模式绕过。
VVR 6.0.7及以上版本:推荐配置为jdbc。
如果Hologres实例为2.0以下版本,Flink系统将采用您配置的参数值。
如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。
useRpcMode
是否通过RPC方式使用Hologres连接器。
Boolean
否
false
参数取值如下:
true:使用RPC方式使用Hologres连接器。与sdkMode参数设置为rpc效果相同。通过RPC方式会降低SQL连接数。
false(默认值):使用JDBC方式使用Hologres连接器。
通过JDBC方式会占用SQL连接,导致JDBC链接数增加。
说明Hologres 2.0版本下线了rpc了服务,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。
实时计算引擎VVR 6.0.7及以上版本在检测到Hologres实例是2.0及以上版本时,会自动切换rpc模式为jdbc_fixed模式
connectionSize
单个Flink维表任务所创建的JDBC连接池大小。
Integer
否
3
如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。
connectionPoolName
连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。
String
否
无
每个表默认使用自己的连接池。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectorSize都需要相同。
说明仅实时计算VVR 4.0.12及以上版本支持该参数。
jdbcReadBatchSize
点查Hologres维表时,攒批处理的最大条数。
Integer
否
128
无。
jdbcReadBatchQueueSize
维表点查请求缓冲队列大小。
Integer
否
256
无。
jdbcReadTimeoutMs
维表点查的超时时间。
Long
否
默认值为0,表示不会超时
仅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支持该参数。
jdbcReadRetryCount
维表点查超时时的重试次数。
Interger
否
1
仅VVR 6.0.3以上版本支持该参数。
本参数与jdbcRetryCount不同,后者是指连接发生异常时的重试次数。
只有设置了jdbcReadTimeoutMs时本参数才会生效。
jdbcScanFetchSize
在一对多join(即没有使用完整主键)时使用scan接口,scan攒批处理数据的条数。
Integer
否
256
无。
jdbcScanTimeoutSeconds
scan操作的超时时间。
Integer
否
60
单位为秒。
cache
缓存策略。
String
否
见备注列。
Hologres仅支持以下两种缓存策略:
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
None:无缓存。
说明Cache默认值和VVR版本有关:
VVR 4.x版本及以上版本,默认值为None。
VVR 4.x版本以下版本,默认值为LRU。
cacheSize
缓存大小。
Integer
否
10000
选择LRU缓存策略后,可以设置缓存大小。单位为条。
cacheTTLMs
缓存更新时间间隔。
Long
否
见备注列。
单位为毫秒。cacheTTLMs默认值和cache的配置有关:
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
cacheEmpty
是否缓存join结果为空的数据。
Boolean
否
true
true(默认值):表示缓存join结果为空的数据。
false:表示不缓存join结果为空的数据。
async
是否异步同步数据。
Boolean
否
false
true:表示异步同步数据。
false(默认值):表示不进行异步同步数据。
说明异步同步数据默认是无序的。
类型映射
Flink与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射。
使用示例
源表示例
非Binlog Source表
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' --该参数可选。
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Binlog Source表
Hologres连接器支持实时消费Hologres,即实时消费Hologres的Binlog数据。Flink实时消费Hologres详情请参见实时计算Flink版实时消费Hologres。
结果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
维表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
特色功能详解
流式语义
宽表Merge和局部更新功能
作为CTAS和CDAS的目标端
DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器。
Hologres源表
VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。
VVR 4.0.15
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// 构建HologresBulkreadInputFormat读取表数据。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
Hologres Binlog源表
VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。
VVR 4.0.15
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 设置或创建默认slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 构建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。
Hologres Binlog注意事项和实现原理等详情,请参见Binlog Source表。
Hologres结果表
VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。
VVR 4.0.15
// 初始化读取的表的Schema。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。