本文为您介绍如何使用Connector同步DataHub中的数据至Hologres。

前提条件

  • 开通Hologres并连接开发工具,详情请参见PSQL客户端
  • 开通DataHub,详情请参见快速入门
  • 目前仅支持同步DataHub的TUPLE数据至Hologres中。

背景信息

阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。

DataHub提供数据Sink/Source功能(数据同步),支持对Topic中的数据通过Hologres Connector实时同步到Hologres,对数据进行多维分析和实时探索。

DataHub与Hologres的概念映射如下表所示。
DataHub Hologres
Project Database
Topic Table

使用限制

  • Hologres实例开启白名单功能,则无法同步DataHub的数据至Hologres,请不要开启白名单功能。
  • 数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE
  • 不支持写入带有Default值的表。

同步介绍

同步DataHub中的数据至Hologres,有两种同步模式和两种同步策略,同步模式与同步策略还可以分别进行组合,实现不同的效果。
说明
  • 以下的两种同步模式和两种策略,不是DataHub的任务级别配置,而是在Hologres建表时的表属性,必须在建Hologres表时指定。
  • 同步DataHub中的数据至Hologres与DataWorks数据集成批量同步至Hologres SDK写入模式冲突,关于SDK写入模式的介绍请参见Hologres Writer
  • 同步模式
    • 逐条插入
      逐条插入是指将DataHub数据逐条写入Hologres,需要在Hologres建表时指定表属性如下。
      call set_table_property('<table_name>', 'datahub_sync_mode', 'none');
    • 回放
      回放是指回放上游数据库的DML操作,DataHub相当于是一个binlog,若是使用dts-datahub-hologres是否启用新的附加列规则,需要在Hologres建表时指定表属性如下。
      • 是否启用新的附加列规则选择为时,需要在Hologres中建表时配置如下表属性。
        call set_table_property('<table_name>', 'datahub_sync_mode', 'dts');
      • 是否启用新的附加列规则选择为时,需要在Hologres中建表时配置如下表属性。
        call set_table_property('<table_name>', 'datahub_sync_mode', 'dts_old');
      DTS在同步数据到DataHub时,会在数据列的基础上附加如下8列,用于描述回放数据信息(INSERT/UPDATE/DELETE),字段的主要说明如下。
      • 附加列命名方式
        旧版数据列名称 新版数据列名称
        dts_${原始列名} new_dts_sync_dts_${原始列名}
      • 附加列说明
        旧版附加列名称 新版附加列名称 数据类型 说明
        dts_record_id new_dts_sync_dts_record_id String 增量日志的记录ID,为该日志唯一标识。
        dts_operation_flag new_dts_sync_dts_operation_flag String 操作类型,取值:
        • I:INSERT操作。
        • D:DELETE操作。
        • U:UPDATE操作。
        dts_instance_id new_dts_sync_dts_instance_id String 数据库的server ID。暂不支持显示实际的值,目前固定为NULL。
        dts_db_name new_dts_sync_dts_db_name String 数据库名称。
        dts_table_name new_dts_sync_dts_table_name String 表名。
        dts_utc_timestamp new_dts_sync_dts_utc_timestamp String 操作时间戳,即Binlog的时间戳(UTC时间)。
        dts_before_flag new_dts_sync_dts_before_flag String 所有列的值是否更新前的值,取值:Y或N。
        dts_after_flag new_dts_sync_dts_after_flag String 所有列的值是否更新后的值,取值:Y或N。
  • 同步策略(主键冲突策略)
    当Hologres表设置主键时,从DataHub写入的数据有如下两种主键冲突策略。
    • 覆盖
      覆盖是指当写入发生主键冲突时,新的数据覆盖老数据,这个时候需要在Hologres建表时指定表属性如下。
      call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_replace');
    • 忽略
      忽略是指写入时发生主键冲突,忽略新数据,即数据不更新,仍然使用老数据,这个时候需要在Hologres建表时指定表属性如下。
      call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_ignore');
  • 同步模式与同步策略组合
    以上通过DataHub写入Hologres的几种模式,不同组合之间实现的效果不同,具体请参见以下。
    • 插入模式与覆盖策略组合
      相当于在Hologres中执行以下SQL。
      INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
    • 插入模式与忽略策略组合
      相当于在Hologres中执行以下SQL。
      INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
    • 回放模式与覆盖策略组合
      • dts_operation_flag=I,相当于在Hologres中执行以下SQL。
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
      • dts_operation_flag=D,相当于在Hologres中执行以下SQL。
        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。
        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
    • 回放模式与忽略策略组合
      • dts_operation_flag=I,相当于在Hologres中执行以下SQL。
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
      • dts_operation_flag=D,相当于在Hologres中执行以下SQL。
        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。
        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING

操作步骤

  1. 准备DataHub数据源。
    1. 创建项目。
      1. 登录DataHub控制台,单击左侧导航栏的项目管理
      2. 项目列表页面单击新建项目
      3. 新建项目弹框,配置参数后,单击创建
      a
    2. 新建Topic。
      1. 成功创建项目后,在项目列表页面单击项目名称,进入项目详情页。
      2. 单击项目详情页右上角的新建Topic,进入新建Topic页面,填写配置参数。
      吧
      参数 描述
      创建方式
      • 直接创建:创建新的Topic。
      • 导入MaxCompute表结构:选择MaxCompute中已有的表结构创建Topic。
      名称 自定义Topic名称。
      类型 Topic类型,分为以下两种:
      • TUPLE:结构化数据。
      • BLOB:非结构化数据。
      Schema详情 选择TUPLE类型会出现Schema详情,根据自己需求创建字段,允许为NULL代表如果上游没有该字段值自动置为NULL;不允许为NULL则会严格检验,字段类型不匹配写入报错。
      Shard数量 Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
      生命周期 Topic中写入数据在系统中可以保存的最长时间,以天为单位,最小值为1,最大值为7。
      描述 Topic的描述信息。
    3. 写入数据。
      成功创建Topic后,您需要使用工具(例如Blink)或者程序写入数据至Topic中。
  2. Hologres创建数据接收表。
    在Hologres中创建一张用于接收数据的表,表的字段类型与DataHub中Topic的字段类型相互映射。
    DataHub与Hologres的数据类型映射如下表所示。
    DataHub Hologres
    BIGINT BIGINT
    STRING TEXT
    BOOLEAN BOOLEAN
    DOUBLE DOUBLE PRECISION
    TIMESTAMP TIMESTAMPTZ
    DECIMAL DECIMAL
    示例建表语句如下。
    BEGIN;
    CREATE TABLE lineitem ( 
    L_ORDERKEY BIGINT NOT NULL,
    L_PARTKEY BIGINT NOT NULL,
    L_SUPPKEY BIGINT NOT NULL,
    L_LINENUMBER BIGINT NOT NULL,
    L_QUANTITY DECIMAL(20,10),
    L_EXTENDEDPRICE DECIMAL(20,10),
    L_DISCOUNT DECIMAL(20,10),
    L_TAX DECIMAL(20,10),
    L_RETURNFLAG TEXT,
    L_LINESTATUS TEXT,
    L_SHIPDATE TIMESTAMPTZ,
    L_COMMITDATE TIMESTAMPTZ,
    L_RECEIPTDATE TIMESTAMPTZ,
    L_SHIPINSTRUCT TEXT,
    L_SHIPMODE TEXT,
    L_COMMENT TEXT
    );
    
    CALL set_table_property('lineitem', 'orientation', 'column');
    CALL set_table_property('lineitem', 'datahub_sync_mode', 'none');
    CALL set_table_property('lineitem', 'datahub_upsert_mode', 'insert_or_ignore');
    
    COMMIT;
  3. 在DataHub中创建Hologres Connector。
    1. 单击DataHub中已创建的Topic,进入Topic详情页。
    2. 单击Topic详情页右上角的+同步
    3. 新建Connector界面单击Hologres,在新建Connector页面配置参数后,单击创建新建connector
      参数 说明
      Instance Hologres实例的ID。进入Hologres管理控制台,获取实例ID
      Project Hologres的数据库名称。
      Topic Hologres用于接收数据的表名称。
      导入字段 需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。
      鉴权模式 默认为AccessKey。
      AccessKey ID 访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。
      AccessKey Secret 访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
      Timestamp Unit 同步时间单位,可选择如下。
      • MICROSECOND:微秒,为默认值。
      • MILLISECOND:毫秒。
      • SECOND:秒。
  4. 同步DataHub的数据至Hologres。
    成功创建Connector后,您可以在Topic详情页的数据同步中查看实时同步数据的状态。chakan
  5. Hologres查询数据。
    您可以连接Hologres实例至开发工具,实时查询同步至Hologres中的数据,详情请参见概述。示例查询语句如下。
    SELECT COUNT(*) FROM lineitem;

常见报错

为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。

  • 场景1:查询数据时,出现如下报错。
    ErrorMessage [Import field not found in dest schema;

    可能原因:未设置datahub_sync_mode参数值为dts

    解决办法:重新创建Hologres表,并设置表属性datahub_sync_modedts

  • 场景2:查询数据时,出现如下报错。
    ErrorCode=InternalServerError; ErrorMessage =Field already exists 

    可能原因:Hologres列datahub_sync_mode设置为dts,并且建表时包含了8列附加列。

    解决办法:重新创建Hologres表,设置datahub_sync_modedts时,字段只需要跟上游保持一致,无需多增加8列附加列。