将DataHub数据同步至Hologres适用于需要实时数据分析、实时监控、实时报表和复杂数据结构处理的场景,通过结合DataHub和Hologres的能力,可以提升数据处理和分析的效率和准确性。本文介绍如何创建DataHub数据同步任务以及常见问题。
背景信息
DataHub提供数据Sink/Source功能,即数据同步功能,支持将对应Topic中的数据实时/准实时
的同步到第三方阿里云产品中,打通阿里云产品间的数据流通。关于DataHub数据同步功能介绍详情,请参见概述
DataHub与Hologres的映射关系如下表所示。
DataHub | Hologres |
Project | Database |
Topic | Table |
同步场景与策略介绍
同步场景
同步场景 | 说明 |
逐条插入 | 将源数据库中数据逐条插入到目标数据库中,适用于将DataHub数据全量同步至Hologres的场景。 |
回放 | 通过分析并重新执行Binlog数据库操作记录日志,将源数据库变更操作应用到目标数据库中,最终确保数据一致性。适用于DTS同步数据至DataHub,再将DataHub数据同步至Hologres的链路场景,其中DataHub相当于Binlog。 说明 DTS同步数据至DataHub,会在数据列的基础上添加附加列,用于记录数据的操作信息。DTS同步数据至DataHub,附加列的命名存在新旧差异,详情请参见修改数据同步的附加列规则。 |
同步策略
同步策略 | 说明 |
覆盖(replace) | 数据写入发生主键冲突,新的数据覆盖老数据并写入,确保目标数据库中的数据与源数据保持一致。 |
忽略(ignore) | 数据写入发生主键冲突,忽略新数据,即数据不更新,避免重复数据的导入和覆盖,保持目标数据库的数据完整性。 |
注意事项
仅支持将DataHub TUPLE类型数据同步到Hologres。
数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE。
每个同步任务都会占用部分连接数,每个任务占用的连接数等于DataHub Topic的Shard数。
准备工作
开通DataHub服务并准备DataHub数据。具体操作,请参见快速入门。
购买Hologres实例并建表,本文以表
lineitem
为示例。具体操作,请参见购买Hologres和通过HoloWeb连接Hologres创建表。DataHub与Hologres的数据类型映射如下表所示。
DataHub
Hologres
TINYINT
SMALLINT
SMALLINT
SMALLINT
INTEGER
INTEGER
BIGINT
BIGINT
FLOAT
REAL
DOUBLE
DOUBLE PRECISION
DECIMAL
DECIMAL
STRING
TEXT
BOOLEAN
BOOLEAN
TIMESTAMP
TIMESTAMPTZ
示例建表语句如下。
BEGIN; CREATE TABLE lineitem ( L_ORDERKEY BIGINT NOT NULL, L_PARTKEY BIGINT NOT NULL, L_SUPPKEY BIGINT NOT NULL, L_LINENUMBER BIGINT NOT NULL, L_QUANTITY DECIMAL(20,10), L_EXTENDEDPRICE DECIMAL(20,10), L_DISCOUNT DECIMAL(20,10), L_TAX DECIMAL(20,10), L_RETURNFLAG TEXT, L_LINESTATUS TEXT, L_SHIPDATE TIMESTAMPTZ, L_COMMITDATE TIMESTAMPTZ, L_RECEIPTDATE TIMESTAMPTZ, L_SHIPINSTRUCT TEXT, L_SHIPMODE TEXT, L_COMMENT TEXT ); caLL set_table_property('lineitem', 'orientation', 'column'); COMMIT;
创建同步任务
登录DataHub服务控制台,单击已创建的Topic,进入Topic详情页。
单击Topic详情页右上角的+同步。
单击Hologres,在新建Connector页面配置参数。
参数
说明
Instance
Hologres实例的ID。进入Hologres管理控制台,获取实例ID。
Database
用于接收DataHub数据的Hologres数据库名称。
Table
用于接收DataHub数据的Hologres表名称
lineitem
。主键冲突策略
主键冲突时的数据更新策略。取值说明:
replace(默认值):数据写入发生主键冲突时,新数据写入覆盖老数据。
ignore:数据写入发生主键冲突时,忽略新数据,即数据不更新,仍然使用老数据。
关于数据同步策略详情,请参见同步策略。
同步场景
数据同步的使用场景。取值说明:
default(默认值):逐条插入。
dts:通过DTS同步数据至DataHub时,若启用新附加列规则,选择此参数。
dts_old:通过DTS同步数据至DataHub时,若未启用新附加列规则,选择此参数。
关于数据同步场景详情,请参见同步场景。
导入字段
需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。
鉴权模式
默认为AccessKey。
AccessKey ID
访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。
AccessKey Secret
访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
Timestamp Unit
同步时间单位,取值说明。
MICROSECOND:微秒,为默认值。
MILLISECOND:毫秒。
SECOND:秒。
单击创建,同步DataHub的数据至Hologres。
创建Connector后,您可以在Topic详情页的同步任务中查看实时同步数据的状态。
Hologres查询数据。
您可以连接Hologres实例开发工具,实时查询同步至Hologres中的数据。Hologres连接详情,请参见概述。执行以下示例查询语句。
SELECT COUNT(*) FROM lineitem;
常见问题
为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。
问题1
报错信息
ErrorMessage: Import field not found in dest schema.
报错原因
Hologres表中未包含同步任务指定的导入字段。
任务的同步场景指定为default,但是导入字段包含DTS同步到DataHub产生的附加列。
解决方法
重新创建Hologres表,加上未包含的导入字段;修改同步任务的导入字段,去掉Hologres表中不存在的字段。
重新创建同步任务,并指定同步场景为dts或dts_old。
问题2
报错信息
ErrorMessage: Column type not match with Holo column.
报错原因
DataHub Topic中的字段类型与Hologres表中的字段类型不匹配。
解决方法
根据数据类型映射,重新创建Hologres表,设置正确的字段类型。
问题3
报错信息
ErrorMessage: Not import column xxx not allow null and no default value.
报错原因
Hologres表中的部分字段未包含在同步任务的导入字段中,但是这部分字段设置了not null属性,并且未设置default value。
解决方法
重新建Hologres表,对未包含在同步任务导入字段中的字段,不设置not null属性或设置default value。