天任务依赖分钟任务最佳实践

每天00:00执行的SQL任务为天任务,依赖于每5分钟抽取一次数据的分钟任务。天任务会对当天同步任务抽取的所有数据进行计算。

前提条件

开始本实验前,您需要首先准备好以下内容:

  • 请确保已拥有阿里云账号并进行实名认证,详情请参见开通DataWorks服务

  • 开通MaxCompute并创建工作空间,详情请参见创建工作空间

  • 通过RDS创建MySQL实例,获取RDS实例ID,并在RDS控制台添加白名单。详情请参见快速创建RDS MySQL实例添加白名单

    说明
    • 如果是通过自定义资源组调度RDS的数据同步任务,必须把自定义资源组的机器IP也加入RDS的白名单中。

    • 本文以MySQL数据源为例,您可以根据自身需求新增不同类型的数据源。

背景信息

在DataWorks调度系统中,下游对上游的依赖需要遵循的原则为:下游任务生成的实例会找到当天离自己最近结束的一个上游实例作为上游依赖,如果上游依赖实例运行成功,才会触发本节点实例运行。如果上游节点每天生成多个实例,则下游无法识别是哪一个实例离它最近结束,导致必须等上游当天生成的所有实例运行完成后才会运行。

因此,上游节点必须配置自依赖,SQL任务在00:00的实例才会准确依赖00:00生成的同步任务实例结束后再运行。

本实验的实现思路如下:

  1. 创建一个同步节点作为上游的分钟任务,一个SQL节点作为下游的天任务。

  2. 设置同步节点的调度时间为每5分钟调度一次(开始时间00:00,结束时间23:59,时间间隔5分钟)。

  3. 配置同步节点依赖上一周期 > 本节点,以形成自依赖。

  4. 设置SQL任务每天00:00调度一次。

说明

由于天任务依赖分钟任务,如果分钟级任务失败,会影响天任务的执行。

操作步骤

  1. 新增数据源。

    1. 进入数据源页面。

      1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心

      2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。

    2. 单击右上角的新增数据源,添加MySQL数据源,详情请参见配置MySQL数据源

  2. 新建业务流程。

    1. 单击当前页面左上角的图标图标,选中全部产品 > 数据开发与运维 > DataStudio(数据开发)

    2. 鼠标悬停至image.png图标,单击新建业务流程

    3. 新建业务流程对话框中,输入业务名称描述

    4. 单击新建

    5. 右键单击您所创建的业务流程,然后单击新建节点 > 离线同步进入新建节点对话框,输入新建节点名称信息。

      image

    6. 单击确认

      以同样的方式再新建一个ODPS SQL节点。

    7. 通过拖拽连线,设置离线同步节点为ODPS SQL节点的上游。

      节点依赖

      • 数据同步节点用来同步每5分钟调度一次的MySQL数据至MaxCompute。

      • ODPS SQL节点用来汇总MaxCompute接收的数据。

    8. 单击工具栏中的保存图标。

  3. 配置作为分钟任务的离线同步节点。

    1. 双击离线同步节点,进入该节点的编辑页面。

    2. 选择数据来源数据去向

      本示例为同步MySQL的数据至MaxCompute。根据过滤条件过滤每5分钟更新的数据,目标端的分区根据定时时间的前5分钟创建,以保证所有数据都写入同一天的分区中。选择数据来源

      输入数据过滤insert_time>=${startTime} and insert_time<${endTime}

    3. 配置字段映射。

      数据来源数据去向均创建了idnameinsert_time三列字段。insert_time为时间列,数据可以根据时间来过滤。字段映射

    4. 单击右侧的调度配置

      进入调度配置页面,在调度参数中,为过滤条件中的参数赋值:startTime=[yyyymmddhh24miss−5/24/60] endTime=[yyyymmddhh24miss]

      image

      调度配置页面,找到时间属性,设置调度周期分钟开始时间00:00,每间隔5分钟调度一次。为保证一天的实例都运行完,您需要设置自依赖。自依赖

      • 上述配置可以保证您一天产生的实例能够依次运行完成,并保存至MaxCompute表同一天的分区中。

      • 如果您的分钟任务中有一个调度任务出错,则设置自依赖后面的实例都不会运行,需要您手动进行处理。

      • 为避免出现上述问题,您可以设置数据过滤为insert_time<${endTime},每次都进行全量同步,只要有成功的便可同步endTime数据。您无需设置自依赖,但会增加数据库的负担。

  4. 配置作为天任务的ODPS SQL节点。

    1. 双击ODPS SQL节点,进入该节点的编辑页面。

    2. 过滤workshop_odps_mi一天分区中的数据并插入至workshop_odps_dd表中。

      insert overwrite table workshop_odps_dd partition (ds=${yestoday})
      select id, name,insert_time from workshop_odps_mi where ${startTime}<=ds and ds<${endTime};

      本文以ds=20190320为例,在节点编辑页面,输入如下语句。

      insert overwrite table workshop_odps_dd partition (ds=20190320)
      select id, name,insert_time from workshop_odps_mi where 20190320000000<=ds and ds<20190321000000;

      因为天运行定时时间在分钟任务运行结束后,所以插入workshop_odps_dd分区(ds)的时间需要和分钟任务时间在同一天,您将ds时间减1即可。

    3. 单击右侧的调度配置,参数赋值为startTime=$[yyyymmddhh24miss-1]endTime=$[yyyymmddhh24miss]yesterday=$[yyyymmdd-1]

      image

    4. 单击工具栏中的保存图标。

  5. 单击业务流程工具栏中的运行图标,查看运行结果。

    插入数据