阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务DTS(Data Transmission Service),您可以将RDS MySQL同步至阿里云流式数据服务DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。
前提条件
- 已创建源实例RDS MySQL。
说明
- 源实例RDS MySQL的创建方式,请参见创建RDS MySQL实例。
- 支持的版本,请参见同步方案概览。
- 已开通DataHub服务,并且已创建用作接收同步数据的Project。具体的操作请参见快速入门、Project操作。
注意事项
类型 | 说明 |
---|---|
源库限制 |
|
其他限制 |
|
特殊情况 |
当源库为自建MySQL时
|
支持的同步架构
- 一对一单向同步。
- 一对多单向同步。
- 多对一单向同步。
支持同步的SQL操作
操作类型 | SQL操作语句 |
---|---|
DML | INSERT、UPDATE、DELETE |
操作步骤
Topic结构定义说明
id
、name
、address
,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加dts_
的前缀。

结构定义说明:
旧版附加列名称 | 新版附加列名称 | 数据类型 | 说明 |
---|---|---|---|
dts_record_id |
new_dts_sync_dts_record_id |
String | 增量日志的记录ID,为该日志唯一标识。
说明
|
dts_operation_flag |
new_dts_sync_dts_operation_flag |
String | 操作类型,取值:
|
dts_instance_id |
new_dts_sync_dts_instance_id |
String | 数据库的server ID。暂不支持显示实际的值,目前固定为null 。
|
dts_db_name |
new_dts_sync_dts_db_name |
String | 数据库名称。 |
dts_table_name |
new_dts_sync_dts_table_name |
String | 表名。 |
dts_utc_timestamp |
new_dts_sync_dts_utc_timestamp |
String | 操作时间戳,即binlog的时间戳(UTC 时间)。 |
dts_before_flag |
new_dts_sync_dts_before_flag |
String | 所有列的值是否更新前的值,取值:Y或N。 |
dts_after_flag |
new_dts_sync_dts_after_flag |
String | 所有列的值是否更新后的值,取值:Y或N。 |
关于dts_before_flag和dts_after_flag的补充说明
对于不同的操作类型,增量日志中的dts_before_flag
和dts_after_flag
定义如下:
- INSERT
当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以
dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - UPDATE
当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的
dts_record_id
、dts_operation_flag
及dts_utc_timestamp
对应的值相同。第一条增量日志记录了更新前的值,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N。第二条增量日志记录了更新后的值,所以dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - DELETE
当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N,示例如下。