本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文为您介绍如何在数据摄入YAML作业中,使用实时数仓Hologres连接器进行数据同步。
背景信息
实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres YAML连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 数据摄入目标端(Sink) | 
| 运行模式 | 流模式和批模式 | 
| 数据格式 | 暂不支持 | 
| 特有监控指标 | 
 说明  指标含义详情,请参见监控指标说明。 | 
| API种类 | YAML | 
| 是否支持更新或删除结果表数据 | 是 | 
功能说明
| 功能 | 详情 | 
| 支持实时同步整库(或者多张表)的全量和增量数据到每张对应的结果表中。 | |
| 在实时同步整库数据的同时,还支持将每张源表的表结构变更(增加列、删除列、重命名列等)实时同步到结果表中。 | |
| 支持使用正则表达式定义库名,匹配数据源的多个分库下的源表,合并后同步到下游每张对应表名的结果表中。 | |
| 支持将上游的一张表写入到Hologres分区表。 | |
| 采用多种数据映射策略,将上游数据类型映射为更宽的Hologres数据类型。 | 
语法结构
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}参数说明
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| type | sink类型。 | String | 是 | 无 | 固定值为 | 
| name | sink名称。 | String | 否 | 无 | 无。 | 
| dbname | 数据库名称。 | String | 是 | 无 | 无。 | 
| username | 用户名,请填写阿里云账号的AccessKey ID。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要  为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 | 
| password | 密码,请填写阿里云账号的AccessKey Secret。 | String | 是 | 无 | |
| endpoint | Hologres服务地址。 | String | 是 | 无 | 详情请参见访问域名。 | 
| jdbcRetryCount | 当连接故障时,写入和查询的重试次数。 | Integer | 否 | 10 | 无。 | 
| jdbcRetrySleepInitMs | 每次重试的固定等待时间。 | Long | 否 | 1000 | 单位为毫秒。实际重试的等待时间的计算公式为 | 
| jdbcRetrySleepStepMs | 每次重试的累加等待时间。 | Long | 否 | 5000 | 单位为毫秒。实际重试的等待时间的计算公式为 | 
| jdbcConnectionMaxIdleMs | JDBC连接的空闲时间。 | Long | 否 | 60000 | 单位为毫秒。超过这个空闲时间,连接就会断开释放掉。 | 
| jdbcMetaCacheTTL | 本地缓存TableSchema信息的过期时间。 | Long | 否 | 60000 | 单位为毫秒。 | 
| jdbcMetaAutoRefreshFactor | 如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。 | Integer | 否 | 4 | 缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。 触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。 | 
| mutatetype | 数据写入模式。 | String | 否 | INSERT_OR_UPDATE | 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保Exactly-once语义。当同主键数据出现多次时,您需要设置mutatetype参数确定更新结果表的方式,mutatetype取值如下: 
 | 
| createparttable | 当写入分区表时,是否根据分区值自动创建不存在的分区表。 | Boolean | 否 | false | 无。 | 
| sink.delete-strategy | 撤回消息的处理方式。 | String | 否 | 无 | 参数取值如下: 
 | 
| jdbcWriteBatchSize | JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。 | Integer | 否 | 256 | 单位为数据行数。 说明  jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 | 
| jdbcWriteBatchByteSize | JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。 | Long | 否 | 2097152(2*1024*1024)字节,即2 MB | 说明  jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 | 
| jdbcWriteFlushInterval | JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。 | Long | 否 | 10000 | 单位为毫秒。 说明  jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 | 
| ignoreNullWhenUpdate | 当mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。 | Boolean | 否 | false | 参数取值如下: 
 | 
| jdbcEnableDefaultForNotNullColumn | 如果将Null值写入Hologres表中Not Null且无默认值的字段,是否允许连接器帮助填充一个默认值。 | Boolean | 否 | true | 参数取值如下: 
 
 | 
| remove-u0000-in-text.enabled | 如果写入时字符串类型包含\u0000非法字符,是否允许连接器帮助去除。 | Boolean | 否 | false | 参数取值如下: 
 
 | 
| deduplication.enabled | jdbc及jdbc_fixed模式写入攒批过程中,是否进行去重。 | Boolean | 否 | true | 参数取值如下: 
 | 
| sink.type-normalize-strategy | 数据映射策略。 | String | 否 | STANDARD | 当Hologres sink转换上游数据到Hologres类型时的策略。 
 | 
| table_property.* | Hologres物理表属性。 | String | 否 | 无 | 创建Hologres表时,允许在WITH参数中设置物理表属性,合理的表属性设置可以有助于系统高效地组织和查询数据。 警告  table_property.distribution_key默认为主键值,不要轻易设置,会影响写入数据的正确性。 | 
类型映射
通过配置项sink.type-normalize-strategy设置转换上游数据到Hologres类型时的策略。
- 建议您在首次启动YAML作业时开启sink.type-normalize-strategy。如果启动后再开启sink.type-normalize-strategy,需要删除下游表并且将作业无状态重启才能生效。 
- 目前数组类型仅支持INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。 
- Hologres不支持numeric类型作为主键,因此如果主键类型被映射为numeric,会被转化为varchar类型。 
STANDARD
当sink.type-normalize-strategy为STANDARD时,类型映射如下:
| Flink CDC类型 | Hologres类型 | 
| CHAR | bpchar | 
| STRING | text | 
| VARCHAR | text(长度大于10485760时) | 
| varchar(长度不大于10485760时) | |
| BOOLEAN | bool | 
| BINARY | bytea | 
| VARBINARY | |
| DECIMAL | numeric | 
| TINYINT | int2 | 
| SMALLINT | |
| INTEGER | int4 | 
| BIGINT | int8 | 
| FLOAT | float4 | 
| DOUBLE | float8 | 
| DATE | date | 
| TIME_WITHOUT_TIME_ZONE | time | 
| TIMESTAMP_WITHOUT_TIME_ZONE | timestamp | 
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz | 
| ARRAY | 各种类型的数组 | 
| MAP | 不支持 | 
| ROW | 不支持 | 
BROADEN
当sink.type-normalize-strategy为BROADEN时,将Flink CDC类型转换为更广泛的Hologres类型。数据映射如下:
| Flink CDC类型 | Hologres类型 | 
| CHAR | text | 
| STRING | |
| VARCHAR | |
| BOOLEAN | bool | 
| BINARY | bytea | 
| VARBINARY | |
| DECIMAL | numeric | 
| TINYINT | int8 | 
| SMALLINT | |
| INTEGER | |
| BIGINT | |
| FLOAT | float8 | 
| DOUBLE | |
| DATE | date | 
| TIME_WITHOUT_TIME_ZONE | time | 
| TIMESTAMP_WITHOUT_TIME_ZONE | timestamp | 
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz | 
| ARRAY | 各种类型的数组 | 
| MAP | 不支持 | 
| ROW | 不支持 | 
ONLY_BIGINT_OR_TEXT
当sink.type-normalize-strategy为ONLY_BIGINT_OR_TEXT时,将所有Flink CDC类型转换为Hologres中的BIGINT或STRING类型。类型映射如下:
| Flink CDC类型 | Hologres类型 | 
| TINYINT | int8 | 
| SMALLINT | |
| INTEGER | |
| BIGINT | |
| BOOLEAN | text | 
| BINARY | |
| VARBINARY | |
| DECIMAL | |
| FLOAT | |
| DOUBLE | |
| DATE | |
| TIME_WITHOUT_TIME_ZONE | |
| TIMESTAMP_WITHOUT_TIME_ZONE | |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
| ARRAY | 各种类型的数组 | 
| MAP | 不支持 | 
| ROW | 不支持 | 
分区表写入
Hologres Sink支持分区表写入,搭配Transform可以将上游数据写入到Hologres分区表中。写入时需要注意:
- 分区键(Partition Key)必须为主键的一部分,如果采用上游非主键中的一个作为分区表,可能会导致上下游主键不一致。数据同步时,如果上下游主键不一致,会导致数据不一致。 
- Hologres支持将TEXT、VARCHAR以及INT类型的数据作为分区键(Partition Key),V1.3.22及以上版本支持将DATE类型设为分区键。 
- 需要设置createparttable为true, 才能自动创建分区子表,否则用户需要手动创建分区子表。 
示例请参见分区表写入示。
表结构变更同步
CDC Yaml Pipeline作业在处理表结构变更时有不同的策略,通过pipeline级别的配置项schema.change.behavior来设置。schema.change.behavior取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支持TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表结构变更,接下来会说明如何处理不同表结构变更事件(Schema Change Event)。
LENIENT(默认)
LENIENT模式下支持的Schema变更策略详情如下:
- 添加可空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据。 
- 删除可空列:不会直接在结果表中删除该列,而是将该列的数据自动填充为NULL值。 
- 添加非空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据,新增的列会默认设置为可空列,对于添加列发生之前的数据自动设置为NULL值。 
- 重命名列:被看作为添加列和删除列。直接在结果表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。例如,如果col_a重命名为col_b,则会在结果表末尾添加col_b,并自动将col_a的数据填充为NULL值。 
- 列类型变更:不支持。由于Hologres不支持列类型变更,需要搭配sink.type-normalize-strategy使用。 
- 暂不支持同步以下Schema的变更: - 主键或索引等约束的变更。 
- 非空列的删除。 
- 从NOT NULL转为NULLABLE变更。 
 
EVOLVE
EVOLVE模式下支持的Schema变更策略详情如下:
- 添加可空列:支持 
- 删除可空列:不支持。 
- 添加非空列:会在结果表添加可空列。 
- 重命名列:支持,会在结果表将原有列重命名。 
- 列类型变更:不支持。由于Hologres不支持列类型变更,需要搭配sink.type-normalize-strategy使用。 
- 暂不支持同步以下Schema的变更: - 主键或索引等约束的变更。 
- 非空列的删除。 
- 从NOT NULL转为NULLABLE变更。 
 
在EVOLVE模式下,如果在未删除结果表的情况下无状态重启,有可能出现上游数据与结果表的结构不一致的情况导致作业失败,需要用户手动调整下游表结构。
开启EVOLVE模式示例请参见开启EVOLVE模式。
代码示例
宽类型映射
通过配置项sink.type-normalize-strategy设置宽类型映射。
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
pipeline:
  name: MySQL to Hologres Pipeline分区表写入
将上游时间戳类型的create_time字段转化为日期类型,作为Hologres表的分区键。
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 
pipeline:
  name: MySQL to Hologres Pipeline开启EVOLVE模式
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve单表同步
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
pipeline:
  name: MySQL to Hologres Pipeline整库同步
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
pipeline:
  name: MySQL to Hologres Pipeline分库分表合并
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user
pipeline:
  name: MySQL to Hologres Pipeline同步到指定schema
Hologres的Schema对应MySQL的Database,可以执行结果表的Schema。
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r
pipeline:
  name: MySQL to Hologres Pipeline不重启同步新增表
如果想在作业运行的过程中实时同步新增表,设置scan.binlog.newly-added-table.enable = true.
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
pipeline:
  name: MySQL to Hologres Pipeline重启新增存量表
如果想要新增同步存量表,设置scan.newly-added-table.enabled = true后重启作业。
如果作业先设置scan.binlog.newly-added-table.enabled为true捕获新增表,不可以再通过scan.newly-added-table.enabled = true重启捕获存量表,否则会有数据重复发送。
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
pipeline:
  name: MySQL to Hologres Pipeline整库同步时排除部分表
source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499
sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
pipeline:
  name: MySQL to Hologres Pipeline相关文档
- source、sink、transform和route模块的开发参考,详情请参见Flink CDC数据摄入作业开发参考。 
- 数据摄入YAML作业开发的操作步骤,详情请参见Flink CDC数据摄入作业开发(公测中)。