Hologres Writer实现了导入数据至实时数仓Hologres的功能,您可以把多种数据源的数据导入Hologres进行实时分析。
使用限制
- Hologres Writer不支持写入数据至Hologres的外部表。
- Hologres Writer仅支持使用独享数据集成资源组,不支持使用其他类型的数据集成资源组。
支持的Hologres版本
Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。
支持的字段类型
不支持UUID类型的字段。字段类型 | 离线读(Hologres Reader) | 离线写(Hologres Writer) | 实时写 |
---|---|---|---|
UUID | 不支持 | 不支持 | 不支持 |
CHAR | 支持 | 支持 | 支持 |
NCHAR | 支持 | 支持 | 支持 |
VARCHAR | 支持 | 支持 | 支持 |
LONGVARCHAR | 支持 | 支持 | 支持 |
NVARCHAR | 支持 | 支持 | 支持 |
LONGNVARCHAR | 支持 | 支持 | 支持 |
CLOB | 支持 | 支持 | 支持 |
NCLOB | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
TINYINT | 支持 | 支持 | 支持 |
INTEGER | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
NUMERIC | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
REAL | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
TIME | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
VARBINARY | 支持 | 支持 | 支持 |
BLOB | 支持 | 支持 | 支持 |
LONGVARBINARY | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
BIT | 支持 | 支持 | 支持 |
JSON | 支持 | 支持 | 支持 |
JSONB | 支持 | 支持 | 支持 |
实现原理
Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据writeMode(写入模式)的配置选择不同的写入方式,并根据conflictMode(冲突策略)的配置决定写入数据时的冲突解决策略:
- (推荐使用)writeMode为SQL模式时,通过PSQL的
INSERT INTO
命令(JDBC方式)写入数据,能提供更好的写入性能,建议您采用此写入模式同步数据。 - writeMode为SDK模式时,通过Hologres的写入接口来进行同步数据,后续该模式将逐步不再使用,建议使用SQL模式写入数据。
您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:
重要 conflictMode仅适用于有主键的表。具体写入原理和性能,详情请参考技术原理。
- conflictMode为Replace(整行更新)模式时,新数据覆盖旧数据,整行所有列全部覆盖,没有配置列映射的字段会强制写NULL。
- conflictMode为Update(更新)模式时,新数据覆盖旧数据,只覆盖配置有列映射的字段。
- conflictMode为Ignore(忽略)模式时,忽略新数据。
在不同的writeMode下,conflictMode的实现方式也不同。writeMode为SDK 模式时,通过设置Hologres Table属性来改变此次导入的冲突解决模式。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
endpoint | 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port 。您可以从交互式分析实例的管理页面获取。 endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。 | 是 | 无 |
accessId | 访问Hologres的accessId。 | 是 | 无 |
accessKey | 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。 | 是 | 无 |
database | Hologres实例内部数据库的名称。 | 是 | 无 |
table | Hologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name 。 | 是 | 无 |
writeMode | writeMode包括SDK和SQL(INSERT INTO),详情请参见实现原理。 在脚本模式中,SDK的可选配置如下:
| 是 | 无 |
conflictMode | conflictMode包括Replace、Update和Ignore,详情请参见实现原理。 | 是 | 无 |
column | 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"] 表示全部列。 | 是 | 无 |
partition | 针对分区表,表示分区Column以及对应的Value,格式为column=value 。 说明
| 否 | 空,表示非分区表 |
truncate | 写入Holo表之前是否需要清空目标表。
| 否 | false |
向导开发介绍
- 选择数据源。
配置同步任务的数据来源和数据去向,同步数据至Hologres时,数据去向配置参数如下。
参数 描述 数据源 通常输入您配置的数据源名称。第一次数据同步需要先创建Hologres数据源,详情请参见配置Hologres数据源。 表 需要同步的Hologres目标表,可以通过一键生成目标表自动创建,也可以提前在Hologres中创建。即上述参数说明中的table。 说明- 一键生成目标表创建的仅是基础表,请根据实际业务修改建表语句并添加相应的索引,以获得更好的性能,详情请参见建表概述。
- 支持自动路由分区,若是通过一键生成目标表创建表,需要将表创建为分区表,以实现分区自动路由的功能,若是无对应分区子表则会自动创建,详情请参见CREATE PARTITION TABLE。
分区信息 Hologres的分区键。 写入模式 即上述参数说明中的writeMode。 - (推荐)SQL(INSERT INTO),默认使用SQL模式,性能好。
- SDK,后续逐步不再使用,不推荐使用此模式。
写入冲突策略 即上述参数说明中的conflictMode。 同步前是否要清空Hologres表 即上述参数说明中的truncate。 最大连接数 JDBC使用的最大连接数,仅在SQL模式下使用,在开启任务时请确保实例有充足的空闲连接。一个任务最多使用9个连接,即一个任务使用9个连接。 - 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应关系。
参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 自动排版 可以根据相应的规律自动排版。 - 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。配置非分区表和分区表的示例如下:
- 配置非分区表
- 配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": "<mysql_source_name>", "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "connection": [ { "datasource": "<mysql_source_name>",//mysql数据源名 "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "maxConnectionCount": 9, "datasource": "<holo_sink_name>",//Hologres数据源名称 "truncate":true,//清理规则。 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "writeMode": "insert", "table": "<holo_table_name>" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2,//作业并发数 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
- Hologres表的DDL语句,如下所示。
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
- 配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。
- 配置分区表说明
- 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
- 请确认该参数和表DDL的分区配置匹配。
- 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SQL模式导入的配置。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": "<mysql_source_name>", "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",//mysql的pk字段 "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "maxConnectionCount": 9, "partition": "<partition_key>",//Hologres分区键 "datasource": "<holo_sink_name>",//Hologres数据源名 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "writeMode": "insert", "table": "<holo_table_name>" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2,//作业并发数 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
- Hologres表的DDL语句,如下所示。
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;