Hologres数据源为您提供读取和写入Hologres双向通道的功能,本文为您介绍DataWorks的Hologres数据同步的能力支持情况。
支持的版本
Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。
使用限制
离线读写
Hologres数据源仅支持使用独享数据集成资源组。
Hologres Writer不支持写入数据至Hologres的外部表。
Hologres数据源连通性获取Hologres端点的逻辑:
当前地域的Hologres实例,Hologres端点获取顺序:
。跨地域的Hologres实例,Hologres端点获取顺序:
。
整库实时写
实时数据同步任务仅支持使用独享数据集成资源组。
实时数据同步任务暂不支持同步没有主键的表。
单表、整库全增量实时写
同步数据至Hologres时,目前仅支持将数据写入分区表子表,暂不支持写入数据至分区表父表。
支持的字段类型
字段类型 | 离线读(Hologres Reader) | 离线写(Hologres Writer) | 实时写 |
UUID | 不支持 | 不支持 | 不支持 |
CHAR | 支持 | 支持 | 支持 |
NCHAR | 支持 | 支持 | 支持 |
VARCHAR | 支持 | 支持 | 支持 |
LONGVARCHAR | 支持 | 支持 | 支持 |
NVARCHAR | 支持 | 支持 | 支持 |
LONGNVARCHAR | 支持 | 支持 | 支持 |
CLOB | 支持 | 支持 | 支持 |
NCLOB | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
TINYINT | 支持 | 支持 | 支持 |
INTEGER | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
NUMERIC | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
REAL | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
TIME | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
VARBINARY | 支持 | 支持 | 支持 |
BLOB | 支持 | 支持 | 支持 |
LONGVARBINARY | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
BIT | 支持 | 支持 | 支持 |
JSON | 支持 | 支持 | 支持 |
JSONB | 支持 | 支持 | 支持 |
实现原理
离线读写
Hologres Reader通过PSQL读取Hologres表中的数据,根据表的Shard Count发起多个并发,每个Shard对应一个Select并发任务:
Hologres在创建表时,在同一个
CREATE TABLE
事务中,通过CALL set_table_property('table_name', 'shard_count', 'xx')
配置表的Shard Count。默认情况下,使用数据库默认的Shard Count,具体数值取决于Hologres实例的配置。
Select语句通过表的内置列hg_shard_id的Shard筛选数据。
离线写
Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据conflictMode(冲突策略)的配置决定写入数据时的冲突解决策略。
您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:
conflictMode仅适用于有主键的表。具体写入原理和性能,详情请参考技术原理。
conflictMode为Replace(整行更新)模式时,新数据覆盖旧数据,整行所有列全部覆盖,没有配置列映射的字段会强制写NULL。
conflictMode为Update(更新)模式时,新数据覆盖旧数据,只覆盖配置有列映射的字段。
conflictMode为Ignore(忽略)模式时,忽略新数据。
数据同步任务开发
Hologres数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表、整库实时同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
单表、整库全增量实时写任务配置指导
操作流程请参见数据集成侧同步任务配置。
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
Reader脚本Demo
配置非分区表
配置从Hologres非分区表读取数据至内存,如下所示。
{ "type":"job", "version":"2.0",//版本号。 "steps":[ { "stepType":"holo",//插件名。 "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "***************", //访问Hologres的accessId。 "accessKey": "*******************", //访问Hologres的accessKey。 "database": "postgres", "table": "holo_reader_****", "column" : [ //字段。 "tag", "id", "title" ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1,//作业并发数。 "mbps":"12"//限流,此处1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Hologres表的DDL语句,如下所示。
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
配置分区表
配置从内存产生的数据同步至Hologres分区表的子表。
说明请注意partition的配置。
{ "type":"job", "version":"2.0",//版本号。 "steps":[ { "stepType":"holo",//插件名。 "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "***************", //访问Hologres的accessId。 "accessKey": "*******************", //访问Hologres的accessKey。 "database": "postgres", "table": "holo_reader_basic_****", "partition": "tag=foo", "column" : [ "*" ], "fetchSize": "100" }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1,//作业并发数。 "mbps":"12"//限流,此处1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Hologres表的DDL语句,如下所示。
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # 确保分区表子表已经创建且导入数据。 postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
endpoint | 目标交互式分析(Hologres)实例对应的endpoint,格式为 endpoint包括经典网络、公网和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
通常建议数据集成资源组和Hologres实例配在同一个地域的同一个可用区,以保证网络端口连通,实现最大性能。 | 是 | 无 |
accessId | 访问Hologres的accessId。 | 是 | 无 |
accessKey | 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。 | 是 | 无 |
database | Hologres实例内部数据库的名称。 | 是 | 无 |
table | Hologres的表名称,如果是分区表,请指定父表的名称。 | 是 | 无 |
column | 定义导入目标表的数据列,必须包含目标表的主键集合。例如 | 是 | 无 |
partition | 针对分区表,表示分区Column以及对应的Value,格式为 重要
| 否 | 空,表示非分区表。 |
fetchSize | 指定使用Select语句一次性读取数据的条数。 | 否 | 1,000 |
Writer脚本Demo
配置非分区表
配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": "<mysql_source_name>", "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "connection": [ { "datasource": "<mysql_source_name>",//mysql数据源名 "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "maxConnectionCount": 9, "datasource": "<holo_sink_name>",//Hologres数据源名称 "truncate":true,//清理规则。 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "table": "<holo_table_name>" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2,//作业并发数 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Hologres表的DDL语句,如下所示。
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
配置分区表
说明目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
请确认该参数和表DDL的分区配置匹配。
配置从内存产生的数据同步至Hologres分区表的子表。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": "<mysql_source_name>", "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",//mysql的pk字段 "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "maxConnectionCount": 9, "partition": "<partition_key>",//Hologres分区键 "datasource": "<holo_sink_name>",//Hologres数据源名 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "table": "<holo_table_name>" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2,//作业并发数 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Hologres表的DDL语句,如下所示。
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
endpoint | 目标交互式分析(Hologres)实例对应的endpoint,格式为 endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。 | 是 | 无 |
accessId | 访问Hologres的accessId。 | 是 | 无 |
accessKey | 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。 | 是 | 无 |
database | Hologres实例内部数据库的名称。 | 是 | 无 |
table | Hologres的表名称,目前支持表名称中包含Schema,例如 | 是 | 无 |
conflictMode | conflictMode包括Replace、Update和Ignore,详情请参见实现原理。 | 是 | 无 |
column | 定义导入目标表的数据列,必须包含目标表的主键集合。例如 | 是 | 无 |
partition | 针对分区表,表示分区Column以及对应的Value,格式为 说明
| 否 | 空,表示非分区表 |
truncate | 写入Holo表之前是否需要清空目标表。
| 否 | false |