订阅DataHub数据进行实时计算,插入到RDS实例的目标表中。

  1. 在RDS数据库中,按照源表结构创建相应的表。创建表格
  2. 参考数据存储配置白名单,配置RDS数据库白名单。

    由于本案例的RDS数据库位于华东1区,而流计算项目位于华东2区,所以需要在RDS数据库中配置流计算的白名单。

  3. 创建流计算开发作业,将DataHub中的数据实时同步到RDS的目标表中。
    1. 进入阿里云流计算开发平台,单击开发 > 作业开发 > 新建作业,创建一个名称为test的开发作业。创建开发作业
    2. 单击左侧菜单栏的数据存储,右键单击DataHub 数据存储,选择注册数据存储,填入相关信息,单击注册注册数据存储
      参数 说明
      数据存储类型 DataHub数据存储。
      EndPoint 通过DataHub访问域名获取。
      Project DataHub的项目名称,可在DataHub控制台中获取。
    3. 使用同样的方式注册RDS数据存储,参数说明如下。注册RDS存储
      参数 说明
      Instance RDS的实例ID,可在RDS实例的基本信息页面获取。
      DBName 数据库名称,可在RDS实例的数据库管理页面获取。
      Username 数据库绑定的账号名称,可通过RDS实例的数据库管理页面获取。
      Password 创建数据库时设置的密码。
      注册完成后,系统显示如下。 系统显示情况
    4. 依次双击DataHub 数据存储 > 项目名称(datahub_test_datav) > 表名(mytable),选择右侧的作为输入表引用,在开发作业中引用数据源。在开发作业中引用数据源
    5. 依次双击RDS 数据存储 > 数据库名称(datav_test) > 表名(target_table),选择右侧的作为结果表引用,在开发作业中引用目标表。引用目标表
    6. 通过INSERT INTO语句,将实时计算后的源表数据插入目标表中。将源表数据插入目标表
      说明 如果数据格式不匹配,需要进行相应的 数据格式转换工作,例如使用 from_unixtime函数等。
      示例代码如下:
      INSERT INTO
      target_table
      SELECT
      t.dts_order_id,
      t.dts_buyer_id,
      t.dts_buyer_name,
      t.dts_product_id,
      t.dts_product_name,
      t.dts_create_time,
      t.dts_record_id,
      t.dts_operation_flag,
      t.dts_instance_id,
      t.dts_db_name,
      t.dts_table_name,
      t.dts_utc_timestamp,
      t.dts_before_flag,
      t.dts_after_flag
      FROM
      mytable as t;
    7. 单击上线,选择默认资源配置并进行上线前检查,检查无误后显示如下页面。上线检查
    8. 单击下一步,填入注释,单击上线
    9. 作业上线成功后,选择控制台上方的运维,单击作业右侧的启动
    10. 启动作业弹出框中,选择读取数据时间,并单击按以上配置启动

      启动成功后,可查看作业状态,按照需要停止或重启作业,并查看业务延迟。

      q
      注意 选择的读取数据时间必须在数据同步到DataHub之前,否则可能造成数据丢失,影响查询结果。