DataHub实时同步

将DataHub数据同步至Hologres适用于需要实时数据分析、实时监控、实时报表和复杂数据结构处理的场景,通过结合DataHub和Hologres的能力,可以提升数据处理和分析的效率和准确性。本文介绍如何创建DataHub数据同步任务以及常见问题。

背景信息

DataHub提供数据Sink/Source功能,即数据同步功能,支持将对应Topic中的数据实时/准实时的同步到第三方阿里云产品中,打通阿里云产品间的数据流通。关于DataHub数据同步功能介绍详情,请参见概述

DataHub与Hologres的映射关系如下表所示。

DataHub

Hologres

Project

Database

Topic

Table

同步场景与策略介绍

同步场景

同步场景

说明

逐条插入

将源数据库中数据逐条插入到目标数据库中,适用于将DataHub数据全量同步至Hologres的场景。

回放

通过分析并重新执行Binlog数据库操作记录日志,将源数据库变更操作应用到目标数据库中,最终确保数据一致性。适用于DTS同步数据至DataHub,再将DataHub数据同步至Hologres的链路场景,其中DataHub相当于Binlog。

说明

DTS同步数据至DataHub,会在数据列的基础上添加附加列,用于记录数据的操作信息。DTS同步数据至DataHub,附加列的命名存在新旧差异,详情请参见修改数据同步的附加列规则

同步策略

同步策略

说明

覆盖(replace)

数据写入发生主键冲突,新的数据覆盖老数据并写入,确保目标数据库中的数据与源数据保持一致。

忽略(ignore)

数据写入发生主键冲突,忽略新数据,即数据不更新,避免重复数据的导入和覆盖,保持目标数据库的数据完整性。

注意事项

  • 仅支持将DataHub TUPLE类型数据同步到Hologres。

  • 数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE

  • 每个同步任务都会占用部分连接数,每个任务占用的连接数等于DataHub Topic的Shard数。

准备工作

  1. 开通DataHub服务并准备DataHub数据。具体操作,请参见快速入门

  2. 购买Hologres实例并建表,本文以表lineitem为示例。具体操作,请参见购买Hologres通过HoloWeb连接Hologres创建表

    DataHub与Hologres的数据类型映射如下表所示。

    DataHub

    Hologres

    TINYINT

    SMALLINT

    SMALLINT

    SMALLINT

    INTEGER

    INTEGER

    BIGINT

    BIGINT

    FLOAT

    REAL

    DOUBLE

    DOUBLE PRECISION

    DECIMAL

    DECIMAL

    STRING

    TEXT

    BOOLEAN

    BOOLEAN

    TIMESTAMP

    TIMESTAMPTZ

    示例建表语句如下。

    BEGIN;
    CREATE TABLE lineitem ( 
    L_ORDERKEY BIGINT NOT NULL,
    L_PARTKEY BIGINT NOT NULL,
    L_SUPPKEY BIGINT NOT NULL,
    L_LINENUMBER BIGINT NOT NULL,
    L_QUANTITY DECIMAL(20,10),
    L_EXTENDEDPRICE DECIMAL(20,10),
    L_DISCOUNT DECIMAL(20,10),
    L_TAX DECIMAL(20,10),
    L_RETURNFLAG TEXT,
    L_LINESTATUS TEXT,
    L_SHIPDATE TIMESTAMPTZ,
    L_COMMITDATE TIMESTAMPTZ,
    L_RECEIPTDATE TIMESTAMPTZ,
    L_SHIPINSTRUCT TEXT,
    L_SHIPMODE TEXT,
    L_COMMENT TEXT
    );
    caLL set_table_property('lineitem', 'orientation', 'column');
    COMMIT;

创建同步任务

  1. 登录DataHub服务控制台,单击已创建的Topic,进入Topic详情页。

  2. 单击Topic详情页右上角的+同步

  3. 单击Hologres,在新建Connector页面配置参数。

    参数

    说明

    Instance

    Hologres实例的ID。进入Hologres管理控制台,获取实例ID

    Database

    用于接收DataHub数据的Hologres数据库名称。

    Table

    用于接收DataHub数据的Hologres表名称lineitem

    主键冲突策略

    主键冲突时的数据更新策略。取值说明:

    • replace(默认值):数据写入发生主键冲突时,新数据写入覆盖老数据。

    • ignore:数据写入发生主键冲突时,忽略新数据,即数据不更新,仍然使用老数据。

    关于数据同步策略详情,请参见同步策略

    同步场景

    数据同步的使用场景。取值说明:

    • default(默认值):逐条插入。

    • dts:通过DTS同步数据至DataHub时,若启用新附加列规则,选择此参数。

    • dts_old:通过DTS同步数据至DataHub时,若未启用新附加列规则,选择此参数。

    关于数据同步场景详情,请参见同步场景

    导入字段

    需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。

    鉴权模式

    默认为AccessKey。

    AccessKey ID

    访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。

    AccessKey Secret

    访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。

    Timestamp Unit

    同步时间单位,取值说明。

    • MICROSECOND:微秒,为默认值。

    • MILLISECOND:毫秒。

    • SECOND:秒。

  4. 单击创建,同步DataHub的数据至Hologres。

    创建Connector后,您可以在Topic详情页的同步任务中查看实时同步数据的状态。

  5. Hologres查询数据。

    您可以连接Hologres实例开发工具,实时查询同步至Hologres中的数据。Hologres连接详情,请参见概述。执行以下示例查询语句。

    SELECT COUNT(*) FROM lineitem;

常见问题

为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。

  • 问题1

    • 报错信息

      ErrorMessage: Import field not found in dest schema.
    • 报错原因

      • Hologres表中未包含同步任务指定的导入字段。

      • 任务的同步场景指定为default,但是导入字段包含DTS同步到DataHub产生的附加列。

    • 解决方法

      • 重新创建Hologres表,加上未包含的导入字段;修改同步任务的导入字段,去掉Hologres表中不存在的字段。

      • 重新创建同步任务,并指定同步场景为dtsdts_old

  • 问题2

    • 报错信息

      ErrorMessage: Column type not match with Holo column.
    • 报错原因

      DataHub Topic中的字段类型与Hologres表中的字段类型不匹配。

    • 解决方法

      根据数据类型映射,重新创建Hologres表,设置正确的字段类型。

  • 问题3

    • 报错信息

      ErrorMessage: Not import column xxx not allow null and no default value.
    • 报错原因

      Hologres表中的部分字段未包含在同步任务的导入字段中,但是这部分字段设置了not null属性,并且未设置default value。

    • 解决方法

      重新建Hologres表,对未包含在同步任务导入字段中的字段,不设置not null属性或设置default value。