将DataHub数据同步至Hologres适用于需要实时数据分析、实时监控、实时报表和复杂数据结构处理的场景,通过结合DataHub和Hologres的能力,可以提升数据处理和分析的效率和准确性。本文介绍如何创建DataHub数据同步任务以及常见问题。
背景信息
DataHub提供数据Sink/Source功能,即数据同步功能,支持将对应Topic中的数据实时/准实时的同步到第三方阿里云产品中,打通阿里云产品间的数据流通。关于DataHub数据同步功能介绍详情,请参见概述
DataHub与Hologres的映射关系如下表所示。
| DataHub | Hologres | 
| Project | Database | 
| Topic | Table | 
同步场景与策略介绍
同步场景
| 同步场景 | 说明 | 
| 逐条插入 | 将源数据库中数据逐条插入到目标数据库中,适用于将DataHub数据全量同步至Hologres的场景。 | 
| 回放 | 通过分析并重新执行Binlog数据库操作记录日志,将源数据库变更操作应用到目标数据库中,最终确保数据一致性。适用于DTS同步数据至DataHub,再将DataHub数据同步至Hologres的链路场景,其中DataHub相当于Binlog。 说明  DTS同步数据至DataHub,会在数据列的基础上添加附加列,用于记录数据的操作信息。DTS同步数据至DataHub,附加列的命名存在新旧差异,详情请参见修改数据同步的附加列规则。 | 
同步策略
| 同步策略 | 说明 | 
| 覆盖(replace) | 数据写入发生主键冲突,新的数据覆盖老数据并写入,确保目标数据库中的数据与源数据保持一致。 | 
| 忽略(ignore) | 数据写入发生主键冲突,忽略新数据,即数据不更新,避免重复数据的导入和覆盖,保持目标数据库的数据完整性。 | 
注意事项
- 仅支持将DataHub TUPLE类型数据同步到Hologres。 
- 数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE。 
- 每个同步任务都会占用部分连接数,每个任务占用的连接数等于DataHub Topic的Shard数。 
准备工作
- 开通DataHub服务并准备DataHub数据。具体操作,请参见快速入门。 
- 购买Hologres实例并建表,本文以表 - lineitem为示例。具体操作,请参见购买Hologres和通过HoloWeb连接Hologres创建表。- DataHub与Hologres的数据类型映射如下表所示。 - DataHub - Hologres - TINYINT - SMALLINT - SMALLINT - SMALLINT - INTEGER - INTEGER - BIGINT - BIGINT - FLOAT - REAL - DOUBLE - DOUBLE PRECISION - DECIMAL - DECIMAL - STRING - TEXT - BOOLEAN - BOOLEAN - TIMESTAMP - TIMESTAMPTZ - 示例建表语句如下。 - BEGIN; CREATE TABLE lineitem ( L_ORDERKEY BIGINT NOT NULL, L_PARTKEY BIGINT NOT NULL, L_SUPPKEY BIGINT NOT NULL, L_LINENUMBER BIGINT NOT NULL, L_QUANTITY DECIMAL(20,10), L_EXTENDEDPRICE DECIMAL(20,10), L_DISCOUNT DECIMAL(20,10), L_TAX DECIMAL(20,10), L_RETURNFLAG TEXT, L_LINESTATUS TEXT, L_SHIPDATE TIMESTAMPTZ, L_COMMITDATE TIMESTAMPTZ, L_RECEIPTDATE TIMESTAMPTZ, L_SHIPINSTRUCT TEXT, L_SHIPMODE TEXT, L_COMMENT TEXT ); caLL set_table_property('lineitem', 'orientation', 'column'); COMMIT;
创建同步任务
- 登录DataHub服务控制台,单击已创建的Topic,进入Topic详情页。 
- 单击Topic详情页右上角的+同步。 
- 单击Hologres,在新建Connector页面配置参数。 - 参数 - 说明 - Instance - Hologres实例的ID。进入Hologres管理控制台,获取实例ID。 - Database - 用于接收DataHub数据的Hologres数据库名称。 - Table - 用于接收DataHub数据的Hologres表名称 - lineitem。- 主键冲突策略 - 主键冲突时的数据更新策略。取值说明: - replace(默认值):数据写入发生主键冲突时,新数据写入覆盖老数据。 
- ignore:数据写入发生主键冲突时,忽略新数据,即数据不更新,仍然使用老数据。 
 - 关于数据同步策略详情,请参见同步策略。 - 同步场景 - 数据同步的使用场景。取值说明: - default(默认值):逐条插入。 
- dts:通过DTS同步数据至DataHub时,若启用新附加列规则,选择此参数。 
- dts_old:通过DTS同步数据至DataHub时,若未启用新附加列规则,选择此参数。 
 - 关于数据同步场景详情,请参见同步场景。 - 导入字段 - 需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。 - 鉴权模式 - 默认为AccessKey。 - AccessKey ID - 访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。 - AccessKey Secret - 访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。 - Timestamp Unit - 同步时间单位,取值说明。 - MICROSECOND:微秒,为默认值。 
- MILLISECOND:毫秒。 
- SECOND:秒。 
 
- 单击创建,同步DataHub的数据至Hologres。 - 创建Connector后,您可以在Topic详情页的同步任务中查看实时同步数据的状态。 
- Hologres查询数据。 - 您可以连接Hologres实例开发工具,实时查询同步至Hologres中的数据。Hologres连接详情,请参见概述。执行以下示例查询语句。 - SELECT COUNT(*) FROM lineitem;
常见问题
为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。
- 问题1 - 报错信息 - ErrorMessage: Import field not found in dest schema.
- 报错原因 - Hologres表中未包含同步任务指定的导入字段。 
- 任务的同步场景指定为default,但是导入字段包含DTS同步到DataHub产生的附加列。 
 
- 解决方法 - 重新创建Hologres表,加上未包含的导入字段;修改同步任务的导入字段,去掉Hologres表中不存在的字段。 
- 重新创建同步任务,并指定同步场景为dts或dts_old。 
 
 
- 问题2 - 报错信息 - ErrorMessage: Column type not match with Holo column.
- 报错原因 - DataHub Topic中的字段类型与Hologres表中的字段类型不匹配。 
- 解决方法 - 根据数据类型映射,重新创建Hologres表,设置正确的字段类型。 
 
- 问题3 - 报错信息 - ErrorMessage: Not import column xxx not allow null and no default value.
- 报错原因 - Hologres表中的部分字段未包含在同步任务的导入字段中,但是这部分字段设置了not null属性,并且未设置default value。 
- 解决方法 - 重新建Hologres表,对未包含在同步任务导入字段中的字段,不设置not null属性或设置default value。