订阅DataHub数据进行实时计算,插入到RDS实例的目标表中。

  1. 在RDS数据库中,按照源表结构创建相应的表。
    创建表格
  2. 参考数据存储配置白名单,配置RDS数据库白名单。

    由于本案例的RDS数据库位于华东1区,而流计算项目位于华东2区,所以需要在RDS数据库中配置流计算的白名单。

  3. 创建流计算开发作业,将DataHub中的数据实时同步到RDS的目标表中。
    1. 进入阿里云流计算开发平台,单击开发 > 作业开发 > 新建作业,创建一个名称为test的开发作业。
      创建开发作业
    2. 单击左侧菜单栏的数据存储,右键单击DataHub 数据存储,选择注册数据存储,填入相关信息,单击注册
      注册数据存储
      参数 说明
      数据存储类型 DataHub数据存储。
      EndPoint 通过DataHub访问域名获取。
      Project DataHub的项目名称,可在DataHub控制台中获取。
    3. 使用同样的方式注册RDS数据存储,参数说明如下。
      注册RDS存储
      参数 说明
      Instance RDS的实例ID,可在RDS实例的基本信息页面获取。
      DBName 数据库名称,可在RDS实例的数据库管理页面获取。
      Username 数据库绑定的账号名称,可通过RDS实例的数据库管理页面获取。
      Password 创建数据库时设置的密码。
      注册完成后,系统显示如下。
      系统显示情况
    4. 依次双击DataHub 数据存储 > 项目名称(datahub_test_datav) > 表名(mytable),选择右侧的作为输入表引用,在开发作业中引用数据源。
      在开发作业中引用数据源
    5. 依次双击RDS 数据存储 > 数据库名称(datav_test) > 表名(target_table),选择右侧的作为结果表引用,在开发作业中引用目标表。
      引用目标表
    6. 通过INSERT INTO语句,将实时计算后的源表数据插入目标表中。
      将源表数据插入目标表
      说明 如果数据格式不匹配,需要进行相应的数据格式转换工作,例如使用from_unixtime函数等。
      示例代码如下:
      INSERT INTO
      target_table
      SELECT
      t.dts_order_id,
      t.dts_buyer_id,
      t.dts_buyer_name,
      t.dts_product_id,
      t.dts_product_name,
      t.dts_create_time,
      t.dts_record_id,
      t.dts_operation_flag,
      t.dts_instance_id,
      t.dts_db_name,
      t.dts_table_name,
      t.dts_utc_timestamp,
      t.dts_before_flag,
      t.dts_after_flag
      FROM
      mytable as t;
    7. 单击上线,选择默认资源配置并进行上线前检查,检查无误后显示如下页面。
      上线检查
    8. 单击下一步,填入注释,单击上线
    9. 作业上线成功后,选择控制台上方的运维,单击作业右侧的启动
    10. 启动作业弹出框中,选择读取数据时间,并单击按以上配置启动

      启动成功后,可查看作业状态,按照需要停止或重启作业,并查看业务延迟。


      q
      注意 选择的读取数据时间必须在数据同步到DataHub之前,否则可能造成数据丢失,影响查询结果。