订阅DataHub数据进行实时计算,插入到RDS实例的目标表中。
- 在RDS数据库中,按照源表结构创建相应的表。
- 参考数据存储配置白名单,配置RDS数据库白名单。
由于本案例的RDS数据库位于华东1区,而流计算项目位于华东2区,所以需要在RDS数据库中配置流计算的白名单。
- 创建流计算开发作业,将DataHub中的数据实时同步到RDS的目标表中。
- 进入阿里云流计算开发平台,单击 ,创建一个名称为test的开发作业。
- 单击左侧菜单栏的数据存储,右键单击DataHub 数据存储,选择注册数据存储,填入相关信息,单击注册。
参数 说明 数据存储类型 DataHub数据存储。 EndPoint 通过DataHub访问域名获取。 Project DataHub的项目名称,可在DataHub控制台中获取。 - 使用同样的方式注册RDS数据存储,参数说明如下。
参数 说明 Instance RDS的实例ID,可在RDS实例的基本信息页面获取。 DBName 数据库名称,可在RDS实例的数据库管理页面获取。 Username 数据库绑定的账号名称,可通过RDS实例的数据库管理页面获取。 Password 创建数据库时设置的密码。 注册完成后,系统显示如下。 - 依次双击作为输入表引用,在开发作业中引用数据源。 ,选择右侧的
- 依次双击作为结果表引用,在开发作业中引用目标表。 ,选择右侧的
- 通过
INSERT INTO
语句,将实时计算后的源表数据插入目标表中。说明 如果数据格式不匹配,需要进行相应的数据格式转换工作,例如使用from_unixtime
函数等。示例代码如下:INSERT INTO target_table SELECT t.dts_order_id, t.dts_buyer_id, t.dts_buyer_name, t.dts_product_id, t.dts_product_name, t.dts_create_time, t.dts_record_id, t.dts_operation_flag, t.dts_instance_id, t.dts_db_name, t.dts_table_name, t.dts_utc_timestamp, t.dts_before_flag, t.dts_after_flag FROM mytable as t;
- 单击上线,选择默认资源配置并进行上线前检查,检查无误后显示如下页面。
- 单击下一步,填入注释,单击上线。
- 作业上线成功后,选择控制台上方的运维,单击作业右侧的启动。
- 在启动作业弹出框中,选择读取数据时间,并单击按以上配置启动。
启动成功后,可查看作业状态,按照需要停止或重启作业,并查看业务延迟。
注意 选择的读取数据时间必须在数据同步到DataHub之前,否则可能造成数据丢失,影响查询结果。