本文为您介绍如何使用Connector同步DataHub中的数据至Hologres。
背景信息
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。
DataHub提供数据Sink/Source功能(数据同步),支持对Topic中的数据通过Hologres Connector实时同步到Hologres,对数据进行多维分析和实时探索。
DataHub与Hologres的概念映射如下表所示。
DataHub | Hologres |
---|---|
Project | Database |
Topic | Table |
使用限制
- Hologres实例开启白名单功能,则无法同步DataHub的数据至Hologres,请不要开启白名单功能。
- 数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE。
- 不支持写入带有Default值的表。
同步介绍
同步DataHub中的数据至Hologres,有两种同步模式和两种同步策略,同步模式与同步策略还可以分别进行组合,实现不同的效果。
说明
- 以下的两种同步模式和两种策略,不是DataHub的任务级别配置,而是在Hologres建表时的表属性,必须在建Hologres表时指定。
- 同步DataHub中的数据至Hologres与DataWorks数据集成批量同步至Hologres SDK写入模式冲突,关于SDK写入模式的介绍请参见Hologres Writer。
- 同步模式
- 逐条插入
逐条插入是指将DataHub数据逐条写入Hologres,需要在Hologres建表时指定表属性如下。
call set_table_property('<table_name>', 'datahub_sync_mode', 'none');
- 回放
回放是指回放上游数据库的DML操作,DataHub相当于是一个binlog,若是使用
dts-datahub-hologres
是否启用新的附加列规则,需要在Hologres建表时指定表属性如下。- 是否启用新的附加列规则选择为是时,需要在Hologres中建表时配置如下表属性。
call set_table_property('<table_name>', 'datahub_sync_mode', 'dts');
- 是否启用新的附加列规则选择为否时,需要在Hologres中建表时配置如下表属性。
call set_table_property('<table_name>', 'datahub_sync_mode', 'dts_old');
DTS在同步数据到DataHub时,会在数据列的基础上附加如下8列,用于描述回放数据信息(INSERT/UPDATE/DELETE),字段的主要说明如下。- 附加列命名方式
旧版数据列名称 新版数据列名称 dts_${原始列名} new_dts_sync_dts_${原始列名} - 附加列说明
旧版附加列名称 新版附加列名称 数据类型 说明 dts_record_id new_dts_sync_dts_record_id String 增量日志的记录ID,为该日志唯一标识。 dts_operation_flag new_dts_sync_dts_operation_flag String 操作类型,取值: - I:INSERT操作。
- D:DELETE操作。
- U:UPDATE操作。
dts_instance_id new_dts_sync_dts_instance_id String 数据库的server ID。暂不支持显示实际的值,目前固定为NULL。 dts_db_name new_dts_sync_dts_db_name String 数据库名称。 dts_table_name new_dts_sync_dts_table_name String 表名。 dts_utc_timestamp new_dts_sync_dts_utc_timestamp String 操作时间戳,即Binlog的时间戳(UTC时间)。 dts_before_flag new_dts_sync_dts_before_flag String 所有列的值是否更新前的值,取值:Y或N。 dts_after_flag new_dts_sync_dts_after_flag String 所有列的值是否更新后的值,取值:Y或N。
- 是否启用新的附加列规则选择为是时,需要在Hologres中建表时配置如下表属性。
- 逐条插入
- 同步策略(主键冲突策略)
当Hologres表设置主键时,从DataHub写入的数据有如下两种主键冲突策略。
- 覆盖
覆盖是指当写入发生主键冲突时,新的数据覆盖老数据,这个时候需要在Hologres建表时指定表属性如下。
call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_replace');
- 忽略
忽略是指写入时发生主键冲突,忽略新数据,即数据不更新,仍然使用老数据,这个时候需要在Hologres建表时指定表属性如下。
call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_ignore');
- 覆盖
- 同步模式与同步策略组合
以上通过DataHub写入Hologres的几种模式,不同组合之间实现的效果不同,具体请参见以下。
- 插入模式与覆盖策略组合
相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
- 插入模式与忽略策略组合
相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
- 回放模式与覆盖策略组合
- dts_operation_flag=I,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
- dts_operation_flag=D,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
- dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
- dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
- dts_operation_flag=I,相当于在Hologres中执行以下SQL。
- 回放模式与忽略策略组合
- dts_operation_flag=I,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
- dts_operation_flag=D,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
- dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
- dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
- dts_operation_flag=I,相当于在Hologres中执行以下SQL。
- 插入模式与覆盖策略组合
操作步骤
常见报错
为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。
-
场景1:查询数据时,出现如下报错。
ErrorMessage [Import field not found in dest schema;
可能原因:未设置datahub_sync_mode参数值为
dts
。解决办法:重新创建Hologres表,并设置表属性datahub_sync_mode为
dts
。 -
场景2:查询数据时,出现如下报错。
ErrorCode=InternalServerError; ErrorMessage =Field already exists
可能原因:Hologres列datahub_sync_mode设置为
dts
,并且建表时包含了8列附加列。解决办法:重新创建Hologres表,设置datahub_sync_mode为
dts
时,字段只需要跟上游保持一致,无需多增加8列附加列。