本文是关于VVR 8.0.x及以下版本的With参数说明。
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为 |
dbname | 数据库名称。 | String | 是 | 无 | Hologres V2.0版本推出了全新的弹性高可用实例形态,将计算资源分解为不同的计算组(Virtual Warehouse),更好的服务于高可用部署,详情请参见计算组实例快速入门。不同的计算组使用相同的Endpoint,您可以通过在dbname参数后添加特定的后缀来指定连接某个计算组。例如某张维表希望连接特定的计算组read_warehouse,可以通过 说明 仅JDBC相关模式支持使用计算组,详见源表、维表和结果表WITH参数中的sdkMode参数。 |
tablename | 表名称。 | String | 是 | 无 | 如果Schema不为Public时,则tablename需要填写为 |
username |
| String | 是 | 无 |
重要 为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 |
password |
| String | 是 | 无 | |
endpoint | Hologres服务地址。 | String | 是 | 无 | 详情请参见访问域名。 |
connection.ssl.mode | 是否启用SSL(Secure Sockets Layer)传输加密,以及启用采用何种模式。 | String | 否 | disable |
说明
|
connection.ssl.root-cert.location | 当传输加密模式需要证书时,配置证书的路径。 | String | 否 | 无 | 当connection.ssl.mode配置为verify-ca或者verify-full时,需要同时配置CA证书的路径。证书可以使用实时计算控制台的文件管理功能上传至平台,上传后文件存放在/flink/usrlib目录下。例如,需要使用的CA证书文件名为certificate.crt,则上传后参数取值应该为 说明
|
jdbcRetryCount | 当连接故障时,写入和查询的重试次数。 | Integer | 否 | 10 | 无。 |
jdbcRetrySleepInitMs | 每次重试的固定等待时间。 | Long | 否 | 1000 | 实际重试的等待时间的计算公式为 |
jdbcRetrySleepStepMs | 每次重试的累加等待时间。 | Long | 否 | 5000 | 实际重试的等待时间的计算公式为 |
jdbcConnectionMaxIdleMs | JDBC连接的空闲时间。 | Long | 否 | 60000 | 超过这个空闲时间,连接就会断开释放掉。单位为毫秒。 |
jdbcMetaCacheTTL | 本地缓存TableSchema信息的过期时间。 | Long | 否 | 60000 | 单位为毫秒。 |
jdbcMetaAutoRefreshFactor | 如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。 | Integer | 否 | 4 | 缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。 触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。 |
type-mapping.timestamp-converting.legacy | Flink和Hologres之间是否进行时间类型的相互转换。 | Boolean | 否 | true |
说明
|
property-version | Connector参数版本。 | Integer | 否 | 0 | 可填的值为0和1,默认值为0。 说明
|
源表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
field_delimiter | 导出数据时,不同行之间使用的分隔符。 | String | 否 | "\u0002" | 无。 |
binlog | 是否消费Binlog数据。 | Boolean | 否 | false |
说明
|
sdkMode | SDK模式。 | String | 否 | holohub |
各版本推荐取值详情请参见注意事项。 |
jdbcBinlogSlotName | JDBC模式的binlog源表的Slot名称。 | String | 否 | 无 | 仅在sdkMode配置为jdbc时生效,如果用户未配置,连接器会默认创建一个Slot来使用。详见JDBC模式Binlog源表。 说明 Hologres实例2.1版本起且使用VVR 8.0.5版本及以上,不再需要配置此参数,连接器也不会尝试自动创建。 |
binlogMaxRetryTimes | 读取Binlog数据出错后的重试次数。 | Integer | 否 | 60 | 无。 |
binlogRetryIntervalMs | 读取Binlog数据出错后的重试时间间隔。 | Long | 否 | 2000 | 单位为毫秒。 |
binlogBatchReadSize | 批量读取Binlog的数据行数。 | Integer | 否 | 100 | 无。 |
cdcMode | 是否采用CDC模式读取Binlog数据。 | Boolean | 否 | false |
说明
|
upsertSource | 源表是否使用upsert类型的Changelog。 | Boolean | 否 | false | 仅在CDC模式下生效。
说明 如果下游包含回撤算子(例如使用ROW_NUMBER OVER WINDOW去重),则需要设置upsertSource为true,此时源表会以Upsert方式从Hologres中读取数据。 |
binlogStartupMode | Binlog数据消费模式。 | String | 否 | earliestOffset |
说明 如果设置了startTime或者在启动界面选择了启动时间,则binlogStartupMode强制使用timestamp模式,其他消费模式不生效,即startTime参数优先级更高。 说明
|
startTime | 启动位点的时间。 | String | 否 | 无 | 格式为yyyy-MM-dd hh:mm:ss。如果没有设置该参数,且作业没有从State恢复,则从最早的Binlog开始消费Hologres数据。 |
jdbcScanFetchSize | 扫描时攒批大小。 | Integer | 否 | 256 | 无。 |
jdbcScanTimeoutSeconds | 扫描操作超时时间。 | Integer | 否 | 60 | 单位为秒。 |
jdbcScanTransactionSessionTimeoutSeconds | 扫描操作所在事务的超时时间。 | Integer | 否 | 600秒(0表示不超时) | 对应Hologres的GUC参数idle_in_transaction_session_timeout,详情请参见GUC参数。 |
enable_filter_push_down | 全量读取阶段是否进行filter下推。 | Boolean | 否 | false |
|
partition-binlog.mode | 消费分区表Binlog模式。 | Enum | 否 | DISABLE |
|
partition-binlog-lateness-timeout-minutes | 在DYNAMIC模式下消费分区表,允许延迟的最大超时时间。 | Boolean | 否 | 60 |
|
partition-values-to-read | 在STATIC模式下消费分区表,指定所需消费的分区,分区值之间使用','进行分隔。 | String | 否 | 无 |
|
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
sdkMode | SDK模式。 | String | 否 | jdbc |
各版本推荐取值详情请参见注意事项。 |
bulkload | 是否采用bulkload写入。 | Boolean | 否 | false | 仅在sdkMode设置为jdbc_copy时生效。bulkload写入目前仅适用于无主键表或者主键保证不重复的有主键表(主键重复会抛出异常),相比默认的jdbc_copy,写入使用更少的Hologres资源。 说明 仅实时计算引擎VVR 8.0.5及以上版本且Hologres实例为2.1及以上版本支持该参数。 |
useRpcMode | 是否通过RPC方式使用Hologres连接器。 | Boolean | 否 | false |
说明
|
mutatetype | 数据写入模式。 | String | 否 | insertorignore |
说明
|
partitionrouter | 是否写入分区表。 | Boolean | 否 | false | 无。 |
createparttable | 当写入分区表时,是否根据分区值自动创建不存在的分区表。 | Boolean | 否 | false | RPC模式下,如果分区值中存在短划线(-),暂不支持自动创建分区表。 说明
|
ignoredelete | 是否忽略撤回消息。 | Boolean | 否 | true | 说明
|
sink.delete-strategy | 撤回消息的处理策略。 | String | 否 | 无 |
说明
|
connectionSize | 单个Flink结果表任务所创建的JDBC连接池大小。 | Integer | 否 | 3 | 如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。 |
jdbcWriteBatchSize | JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。 | Integer | 否 | 256 | 单位为数据行数。 说明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 |
jdbcWriteBatchByteSize | JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。 | Long | 否 | 2*1024*1024字节,即2 MB | 说明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 |
jdbcWriteFlushInterval | JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。 | Long | 否 | 10000 | 单位为毫秒。 说明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 |
ignoreNullWhenUpdate | 当mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。 | Boolean | 否 | false |
说明 当sdk_mode设置为jdbc_copy时,不支持此参数。 |
connectionPoolName | 连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。 | String | 否 | 无 | 取值为非 说明
|
jdbcEnableDefaultForNotNullColumn | 如果将Null值写入Hologres表中Not Null且无默认值的字段,是否允许连接器帮助填充一个默认值。 | Boolean | 否 | true |
|
remove-u0000-in-text.enabled | 如果写入时字符串类型包含\u0000非法字符,是否允许连接器帮助去除。 | Boolean | 否 | false |
重要
|
partial-insert.enabled | 是否只插入INSERT语句中定义的字段。 | Boolean | 否 | false |
说明 此参数仅在mutatetype参数配置为InsertOrUpdate时生效。 |
deduplication.enabled | jdbc及jdbc_fixed模式写入攒批过程中,是否进行去重。 | Boolean | 否 | true |
说明
|
check-and-put.column | 启用条件更新能力,并指定检查的字段名。 | String | 否 | 无 | 参数取值必须设置为Hologres表存在的字段名。 重要
|
check-and-put.operator | 条件更新操作的比较操作符。 | String | 否 | GREATER | 比较新record的check字段与表中旧值,符合条件判断操作符时进行更新。目前支持配置为GREATER、GREATER_OR_EQUAL、EQUAL、NOT_EQUAL、LESS、LESS_OR_EQUAL、IS_NULL、IS_NOT_NULL。 说明 仅实时计算引擎VVR 8.0.11及以上版本支持该参数。 |
check-and-put.null-as | 当条件更新时,如果旧数据为null,则将该null值视为此参数配置的有效值。 | String | 否 | 无 | 由于在PostgreSQL中,任何值与NULL进行比较的结果均为FALSE,因此当表中的原有数据为NULL时,进行更新操作时需要设置一个NULL-AS作为参数,相当于SQL中的COALESCE函数。 说明 仅实时计算引擎VVR 8.0.11及以上版本支持该参数。 |
aggressive.enabled | 是否启用激进提交模式。 | Boolean | 否 | false | 设置为true时,即便攒批未达到预期条数,连接在空闲时将会被强制提交。在流量较小时,可以有效减少数据写入的延时。 说明
|
维表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
sdkMode | SDK模式。 | String | 否 | jdbc |
各版本推荐取值详情请参见注意事项。 |
useRpcMode | 是否通过RPC方式使用Hologres连接器。 | Boolean | 否 | false | 参数取值如下:
说明 该参数取值为true时与sdkMode=rpc同效,推荐操作详情请参见注意事项。 |
connectionSize | 单个Flink维表任务所创建的JDBC连接池大小。 | Integer | 否 | 3 | 如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。 |
connectionPoolName | 连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。 | String | 否 | 无 | 取值为非 说明
|
jdbcReadBatchSize | 点查Hologres维表时,攒批处理的最大条数。 | Integer | 否 | 128 | 无。 |
jdbcReadBatchQueueSize | 维表点查请求缓冲队列大小。 | Integer | 否 | 256 | 无。 |
jdbcReadTimeoutMs | 维表点查的超时时间。 | Long | 否 | 默认值为0,表示不会超时 | 无。 |
jdbcReadRetryCount | 维表点查超时时的重试次数。 | Integer | 否 | 见备注列。 | 本参数与jdbcRetryCount不同,后者是指连接发生异常时的重试次数。 说明 默认值:
|
jdbcScanFetchSize | 在一对多join(即没有使用完整主键)时使用scan接口,scan攒批处理数据的条数。 | Integer | 否 | 256 | 无。 |
jdbcScanTimeoutSeconds | scan操作的超时时间。 | Integer | 否 | 60 | 单位为秒。 |
cache | 缓存策略。 | String | 否 | None | Hologres仅支持None和LRU两种缓存策略。 |
cacheSize | 缓存大小。 | Integer | 否 | 10000 | 选择LRU缓存策略后,可以设置缓存大小。单位为条。 |
cacheTTLMs | 缓存更新时间间隔。 | Long | 否 | 见备注列。 | 单位为毫秒。cacheTTLMs默认值和cache的配置有关:
|
cacheEmpty | 是否缓存join结果为空的数据。 | Boolean | 否 | true |
|
async | 是否异步返回数据。 | Boolean | 否 | false |
说明 异步返回数据是无序的。 |
Flink与Hologres时区说明
时间类型
产品 | 类型 | 说明 |
Flink | 表示没有时区信息的日期和时间,描述年、 月、日、小时、分钟、秒和小数秒对应的时间戳。可以通过一个字符串来指定,例如 | |
用于描述时间线上的绝对时间点,使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数。epoch时间是从Java的标准epoch时间开始计算。在计算和可视化时, 每个TIMESTAMP_LTZ类型的数据都使用Session (会话)中配置的时区。可以用于跨时区的计算,因为它是一个基于epoch的绝对时间点,代表的就是不同时区的同一个绝对时间点。 相同的TIMESTAMP_LTZ值,在不同的时区可能会反映出不同的本地TIMESTAMP,例如:如果一个TIMESTAMP_LTZ值为 | ||
Hologres | TIMESTAMP | 类似于Flink的 |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | 类似于Flink的 例如北京(UTC+8)时区的时间戳 |
时间类型映射
实时计算引擎VVR 8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=false
时,支持所有实时数仓Hologres间的相互转换。Flink
Hologres
详情
TIMESTAMP
TIMESTAMP
之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。
TIMESTAMP LTZ
TIMESTAMPTZ
TIMESTAMP
TIMESTAMPTZ
之间的转换涉及时区转换。为了在转换中保持准确性,需要通过配置项参数
table.local-time-zone
设置Flink时区,配置项参数设置方法请参见如何配置自定义的作业运行参数?。例如当设置
'table.local-time-zone': 'Asia/Shanghai'
时,表示Flink时区为上海(+8时区)时,Flink TIMESTAMP类型的数据为2022-01-01 01:01:01.123456,写入Hologres TIMESTAMP TZ的数值为2022-01-01 01:01:01.123456+8。TIMESTAMP LTZ
TIMESTAMP
实时计算引擎VVR8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=true
时或者VVR 8.0.5及以下版本,除TIMESTAMP间转化,其他类型相互转化可能会出现数据偏差问题。Flink
Hologres
备注
TIMESTAMP
TIMESTAMP
之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。
TIMESTAMP LTZ
TIMESTAMPTZ
读写Hologres数据时都当作无时区时间进行处理,可能会存在数据偏差。
例如,Flink TIMESTAMP_LTZ类型的数值为2024-03-19T04:00:00Z,在上海(+8时区)对应的实际无时区时间为2024-03-19T12:00:00,但是写入时将2024-03-19T04:00:00当作无时区时间,写入Hologres TIMESTAMPTZ的数值为2024-03-19T04:00:00+08,数值偏差8小时。
TIMESTAMP
TIMESTAMPTZ
时区转换默认采用的是运行环境的JVM时区,而不是Flink时区,这与Flink内部计算的时区转换格式不同。当Flink时区与机器的JVM时区不一致时,会导致数据存在偏差,建议采用Flink时区进行Hologres数据的读写。
TIMESTAMP LTZ
TIMESTAMP