文档

最佳实践:数据开发工作流中配置数据推送节点

更新时间:

DataWorks数据开发支持将数据推送作为节点,结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送以及条件推送方式,本实践将为您带来这四种推送方式的实践。

背景信息

在DataWorks业务流程开发中,可以新增数据推送节点,该节点可根据业务流程处理完成数据后,通过简单的查询来获取到需要的数据,根据任务调度及时、快速的推送数据至钉钉群或飞书群中去。

实践思路

  1. 新增一个节点,用来准备测试数据,以便为后续不同推送流程提供数据支持。

  2. 新增MySQL节点、赋值节点以及数据处理节点等,实现对测试数据处理与查询。

    说明

    准备测试数据节点与数据处理查询节点,在实际业务中可以根据实际情况进行调整,本案例以MySQL节点为例进行数据的推送。

  3. 新增数据推送节点,接收SQL查询节点的参数,并将参数推送至钉钉群或飞书群。

实践方式介绍

本实践共有四种数据推送节点与工作流的结合方式,分别为简单推送合并推送脚本推送以及条件推送方式。

  • 简单推送:上游工作流完成SQL查询后,通过节点上下文参数将查询结果输出至数据推送节点进行推送。

  • 合并推送:上游有多个工作流,分别完成SQL查询后,通过节点上下文参数将查询结果输出至一个数据推送节点进行推送。

  • 脚本推送:上游节点为赋值节点,通过赋值节点代码对数据进行处理并通过节点上下文参数将数据输出至推送节点进行推送。赋值节点详情,请参见赋值节点

  • 条件推送:上游流程通过分支节点对数据按照配置的条件进行判断,查询输出符合判断条件的数据,通过节点上下文参数将查询结果输出至数据推送节点进行推送。分支节点详情,请参见分支节点

前提条件

在开启本实践前,您需要首先准备好以下内容:

使用限制

  • 数据推送功能推送至不同对象时的数据大小限制:

    • 推送对象为钉钉,推送数据大小不超过20KB。

    • 推送对象为飞书,推送数据大小不超过30KB,图片小于10MB。

  • 仅以下地域的DataWorks工作空间可使用数据推送功能:华东1(杭州)、华东2(上海)、华北2(北京)、华北3(张家口)、华南1(深圳)、西南1(成都)。

准备流程

新建业务流程

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 数据开发面板,右键单击业务流程,选择新建业务流程。并根据业务需要定义业务流程名称,本案例业务流程命名为数据推送Demo示例

创建流程节点

创建完成数据推送Demo实例业务流程后,双击流程名打开业务流程图页面,根据下表里您需要的推送方式,单击image 创建对应的节点。为方便完成该实践中的推送方式,请以表格内的节点名对创建的节点进行命名。

推送方式

节点名

节点类型

节点说明

条件推送

查询上月销售总额

MySQL节点

查询测试数据中上月销售总额,给分支节点提供参数。

达标条件判断

分支节点

接收MySQL节点传递的参数,按条件进行分支判断,将符合条件的数据提供给不同MySQL节点。

上月销售达标数据

MySQL节点

接收分支节点判断的达标数据,查询并向数据推送节点提供参数。

上月销售不达标数据

接收分支节点判断的不达标数据,查询并向数据推送节点提供参数。

推送销售达标前三名类别

数据推送

接收查询的达标数据,将数据推送至推送目标。

推送销售不达标倒数三名类别

接收查询的不达标数据,将数据推送至推送目标。

脚本推送

查询上一周销售数据

MySQL节点

查询测试数据中上周销售前三类总额,给赋值节点提供参数。

组织销售前三名列表

赋值节点

示例赋值语言为Python语言,接收MySQL节点传递的参数,并将其列表化后,再将结果提供给数据推送节点。

推送上周前三名类别

数据推送

接收赋值节点列表的数据,将数据推送至推送目标。

合并推送

查询昨日销售总额

MySQL节点

  • 查询测试数据中在昨日销售总额与昨日销售增长额,给数据推送节点提供参数。

  • 查询昨日销售总额节点可与简单推送中的昨日销售总额共用同一节点。

查询昨日销售增长额

推送昨日销售总额与增长额

数据推送

同时接收两个MySQL节点传递的参数,将数据推送至推送目标。

简单推送

查询昨日销售总额

MySQL节点

查询测试数据中的昨日销售总额,并为数据推送节点提供参数。

推送昨日销售总额

数据推送

接收MySQL节点传递的参数,并将数据推送至推送目标。

准备数据

在日常业务中存在多种多样的数据场景,本实践以简单的订单表为案例演示数据推送节点的使用,以下为创建并写入测试数据的步骤,若您无需测试数据,也可跳过该步骤。

创建测试表

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 在左侧导航栏,单击image进入临时查询页面后,鼠标悬浮在image按钮,新建 > MySQL新增临时查询。

    • 节点类型:MySQL。

    • 路径:临时查询。

    • 名称:创建测试数据表。

  3. 创建测试表orders,表格SQL如下:

CREATE TABLE orders (
     order_id INT NOT NULL AUTO_INCREMENT,
     category VARCHAR(100) NOT NULL, -- 类别
     sales DOUBLE NOT NULL, -- 订单销售额
     datetime DATETIME NOT NULL, -- 订单发生时间
     PRIMARY KEY (order_id),
     INDEX (category)
);

创建存储过程

以下存储过程将随机生成过去两个月的订单表数据。

说明

存储过程需要在MySQL客户端中执行创建。

DELIMITER $$

CREATE PROCEDURE InsertOrders(IN num_orders INT)
BEGIN
  DECLARE v_category VARCHAR(100);
  DECLARE v_sales DOUBLE;
  DECLARE v_datetime DATETIME;
  DECLARE v_category_list VARCHAR(255);
  DECLARE v_index INT;
  DECLARE i INT DEFAULT 0;
  
  -- 定义类别列表的字符串,以逗号分隔
  SET v_category_list = 'Electronics,Books,Home & Kitchen,Fashion,Toys,Baby,Computers,Electronics,Games,Garden,Clothing,Grocery,Health,Jewelry,Kids';
  -- 获得类别的总数
  SET v_index = ROUND((RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
  
  WHILE i < num_orders DO
    -- 生成随机索引以选择类别
    SET v_index = FLOOR(1 + (RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
    -- 从类别列表中提取随机类别
    SET v_category = SUBSTRING_INDEX(SUBSTRING_INDEX(v_category_list, ',', v_index), ',', -1);
    
    -- 生成1000至30000之间的随机销售额
    SET v_sales = 1000 + FLOOR(RAND() * 29000);
    
    -- 在过去两个月内生成随机日期时间
    SET v_datetime = NOW() - INTERVAL FLOOR(RAND() * 61) DAY;
    
    -- 将新的随机订单插入到订单表中
    INSERT INTO orders (category, sales, datetime) VALUES (v_category, v_sales, v_datetime);
    
    SET i = i + 1;
  END WHILE;
END$$

DELIMITER ;

写入测试数据

存储过程创建成功后,可通过CALL语句调用存储过程,向orders表内存储随机生成的数据。

-- 调用存储过程以插入特定数量的随机订单
CALL InsertOrders(1000); -- 这将插入1000条随机订单

配置推送流程

在DataWorks推送流程根据不同的推送逻辑可以进行条件推送脚本推送合并推送以及简单推送,本实践以MySQL为例进行四种推送方式进行演示。

条件推送

步骤一:搭建推送流程

双击打开数据推送Demo实例业务流程图页面,依次将查询上月销售总额通过达标条件判断分成达标上月销售达标数据推送销售达标前三名类别和不达标上月销售不达标数据推送销售不达标倒数三名类别,关联起来,生成一条如下图所示的条件推送的工作流。

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,通过节点上下文参数获取到SQL查询节点的输出参数outputs,从而实现将SQL的查询结果传递至分支节点。

  1. 双击打开查询上月销售总额节点,编写以下查询SQL代码。

    -- 统计上个月销售额
    SELECT SUM(sales) AS sales_amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

      说明

      初次使用数据推送节点,需先提工单升级调度资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 节点上下文参数:单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

步骤三:配置分支节点

在分支节点内,通过节点上下文参数本节点输入参数获取上游SQL查询节点的输出参数outputs,然后再分支节点进行条件判断后通过本节点输出参数输出符合条件的数据。

  1. 双击打开达标条件判断节点,单击添加分支,新增两个分支,分别为达标和不达标,详细配置信息如下

    配置项

    达标分支

    不达标分支

    分支条件

    ${inputs[0][0]}>=500000。

    ${inputs[0][0]}<500000。

    关联到节点输出

    达标。

    不达标。

    分支描述

    上月销售总额达标。

    上月销售总额不达标。

    说明

    [0][0]为二维数组,用来定位获取到的参数中用于条件判断的数据。

    • 上游为SQL查询节点情况下,使用二位数组,定位参数中需要作为条件的数据。

    • 上游为Python节点情况下,使用一维数组,定位参数中需要作为条件的数据。

  2. 双击打开达标条件判断节点,单击右侧调度配置,在弹出的调度配置中配置如下内容。

    参数类型

    参数内容与说明

    时间属性

    调度周期

    定时调度时间

    08:00

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    本节点输出名称

    当配置分支后,输出名中会自动解析并添加到此处。输出名与分支定义的关联到节点输出一致。

    节点上下文参数

    本节点输入参数

    参数名inputs

    取值来源:选择上游节点查询上月销售总额的输出参数outputs

    本节点输出参数

    系统默认添加为outputs

  3. 完成配置后,单击image保存达标条件判断节点

步骤四:配置分支SQL查询节点

分支SQL查询节点在本实践中分为上月销售达标数据上月销售不达标数据两个节点,需要将分支节点输出的达标不达标节点,配置为分支SQL查询节点的上游节点后,将分支节点SQL查询结果用节点上下文参数本节点输出参数分别输出至对应的达标不达标数据推送节点。

  1. 分别双击上月销售达标数据节点和上月销售不达标数据节点,进入编辑页面,编写以下SQL代码

    上月销售达标数据

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    -- 创建临时表
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    --查询并写入临时表
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount desc limit 3;
    --查询达标前三名数据
    SELECT category, sales, all_cat_sales_volume_month from temp_array;

    上月销售不达标数据

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    --创建临时表
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    
    --查询并写入临时表
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount asc limit 3;
    
    --查询不达标的后三名数据
    SELECT category, sales, all_cat_sales_volume_month from temp_array;
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

      说明

      初次使用数据推送节点,需先提工单升级调度资源组。

    • 依赖的上游节点:添加上游的分支节点,不同的分支SQL查询节点添加对应的上游输出节点。

      • 上月销售达标数据节点:添加上游节点输出名为达标的节点为上游节点。

      • 上月销售不达标数据节点:添加上游节点输出名为不达标的节点为上游节点。

    • 节点上下文参数:单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存上月销售达标数据节点和上月销售不达标数据节点。

步骤五:配置分支数据推送节点

需要创建两个数据推送节点,分别配置节点上下文参数本节点输入参数,来获取上月销售达标数据上月销售不达标数据的分支SQL查询节点输出的outputs参数,在正文中使用这些参数并将其推送至各自的目标。

  1. 双击推送销售达标前三名类别节点和推送销售不达标倒数三名类别节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    确保数据能在08:00时推送至目标渠道。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    • 参数名:inputs

    • 取值来源:

      • 推送销售达标前三名类别节点选择上月销售达标数据节点的输出参数outputs

      • 推送销售不达标倒数三名类别节点选择上月销售不达标数据节点的输出参数outputs

    • 推送销售达标前三名类别节点image

    • 推送销售不达标倒数三名类别节点image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉与飞书。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉或飞书机器人的Webhook,需要在相应的目标平台上获取。

    • 标题推送销售达标前三名类别推送销售不达标倒数三名类别

    • 正文:按需求进行配置,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符进行获取上游输入的参数。

      • 推送销售达标前三名类别示例image

      • 推送销售不达标倒数三名类别示例image

  • 配置完成后,单击image保存推送销售达标前三名类别节点和推送销售不达标倒数三名类别节点。

步骤六:测试条件推送流程

在完成条件推送流程配置后,需要对条件测试流程进行测试,以便后续提交发布。

  1. 双击打开数据推送Demo实例业务流程页面。

  2. 选中查询上月销售总额节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

脚本推送

步骤一:搭建推送流程

双击打开数据推送Demo实例业务流程图页面,在业务流程图页面,可以看到已创建的多个节点,依次将查询上一周销售数据组织销售前三名列表推送上周前三名类别进行关联,生成一条如下图所示的脚本推送的工作流。

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,可以通过节点上下文参数本节点输出参数获取到SQL查询节点的输出参数outputs,从而实现将SQL的查询结果传递至赋值节点。

  1. 双击查询上一周销售数据节点,进入编辑页面,编写以下SQL代码。

    -- 统计上一周销售额
    SELECT category, SUM(sales) as amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category ORDER BY amount DESC limit 3;
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

      说明

      初次使用数据推送节点,需先提工单升级调度资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 节点上下文参数:单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存查询上一周销售数据节点。

步骤三:配置赋值节点

赋值节点通过节点上下文参数下的本节点输入参数可以获取到上游SQL节点的输出参数outputs,将SQL节点的输出参数进行重新赋值调整,生成新的本节点输出参数,将新数据输出至数据推送节点。

  1. 双击组织销售前三名列表的赋值节点,进入节点编辑页面,编写以下Python代码。

    def main():
    
        from datetime import date
        today = date.today()
        formatted_date = today.strftime('%Y-%m-%d')
        
        msg = 'Stat date: ' + formatted_date + ' \\n\\n ' \
        '- 1: ${inputs[0][0]}, sales: ${inputs[0][1]} \\n\\n ' \
        '- 2: ${inputs[1][0]}, sales: ${inputs[1][1]} \\n\\n ' \
        '- 3: ${inputs[2][0]}, sales: ${inputs[2][1]} \\n\\n '
        
        print(msg)
    
    
    if __name__ == "__main__":
        import sys
        main()
  2. 编辑完成赋值代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

    • 节点上下文参数

      • 本节点输入参数

        • 参数名inputs

        • 取值来源:上游查询上一周销售数据节点的输出参数outputs。

      • 本节点输出参数:由系统默认生成。

  3. 单击image保存组织销售前三名列表赋值节点。

步骤四:配置数据推送节点

通过配置节点上下文参数本节点输入参数,来获取上游赋值节点输出的outputs参数,在正文中使用这些参数并将其推送至目标。

  1. 双击名为推送上周前三名类别的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    确保数据能在08:00时推送至目标渠道。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    参数名:inputs

    取值来源:选择上游节点上周销售前三类的输出参数outputs

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉与飞书。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉或飞书机器人的Webhook,需要在相应的目标平台上获取。

    • 标题推送上周前三名类别

    • 正文:按需求进行配置正文即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符进行获取上游输入的参数。

      image

  3. 配置完成后,单击image保存推送上周前三名类别推送节点。

步骤五:测试脚本推送流程

在完成脚本推送流程配置后,需要对脚本测试流程进行测试,以便后续提交发布。

  1. 双击数据推送Demo实例打开业务流程图页面。

  2. 选中查询上一周销售数据节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

合并推送

步骤一:搭建推送流程

双击打开数据推送Demo实例业务流程图页面,在业务流程图页面,可以看到已创建的多个节点,拖拽上游节点查询昨日销售总额查询昨日销售增长额与下游节点推送昨日销售总额与增长额关联起来,生成一条如下图所示的合并推送的工作流。

说明

查询昨日销售总额节点可与简单推送中的昨日销售总额共用同一节点

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,可以通过节点上下文参数本节点输出参数获取到SQL查询节点的输出参数outputs,将SQL的查询结果传递至数据推送节点。

  1. 单击查询昨日销售增长额节点,编写以下查询SQL代码。

    -- 统计前日数据
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 写入前日数据
    INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 统计昨日数据
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 查询写入昨日数据
    INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 统计昨日销售增长额
    CREATE TEMPORARY TABLE IF NOT EXISTS result (
      category VARCHAR(255),
      diff DOUBLE
    );
    -- 查询写入增长额数据
    INSERT INTO result (category, diff) select temp_array2.category as category, temp_array2.sales - temp_array1.sales as diff from temp_array1 left join temp_array2 on temp_array1.category = temp_array2.category;
    
    -- 查询增长额临时表
    select category, diff from result;

  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

      说明

      初次使用数据推送节点,需先提工单升级调度资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 节点上下文参数:单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存查询昨日销售增长额节点。

步骤三:配置数据推送节点

通过节点上下文参数本节点输入参数接收上游查询昨日销售总额查询昨日销售增长额的输出参数outputs,从而可以在正文中使用参数并将其推送至目标。

  1. 双击名为推送昨日销售总额与增长额的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    确保数据能在08:00时推送至目标渠道。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    参数一

    • 参数名inputs1

    • 取值来源:选择上游节点查询昨日销售总额的输出参数outputs

    参数二

    • 参数名inputs2

    • 取值来源:选择上游节点查询昨日销售增长额的输出参数outputs

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉与飞书。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉或飞书机器人的Webhook,需要在相应的目标平台上获取。

    • 标题推送昨日销售总额与增长额

    • 正文:按需求进行配置正文即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符进行获取上游输入的参数。

      image

  3. 配置完成后,单击image保存推送昨日销售总额与增长额推送节点。

步骤四:测试合并推送流程

在完成简单推送流程配置后,需要对合并推送测试流程进行测试,以方便后续提交发布。

  1. 双击打开数据推送Demo实例业务流程图页面。

  2. 选中推送昨日销售总额与增长额节点,右键选择运行到该节点,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

简单推送

步骤一:搭建推送流程

双击数据推送Demo实例后,在业务流程图页面,可以看到已创建的多个节点,需拖动MySQL节点查询昨日销售总额作为上游节点,并将其与作为下游的数据推送节点推送昨日销售总额关联起来,生成一条如下图所示的简单推送的工作流。

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,可以通过节点上下文参数本节点输出参数获取到SQL查询节点的输出参数outputs,从而实现将SQL的查询结果传递至数据推送节点。

  1. 双击查询昨日销售总额节点,编写以下查询SQL代码。

    -- 创建临时表temp_array
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      total_amount DOUBLE
    );
    
    -- 将查询到的昨日销售总额写入临时表temp_array中
    INSERT INTO temp_array (total_amount) 
    SELECT SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59');
    
    -- 查询临时表temp_array
    select total_amount from temp_array;
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

      说明

      初次使用数据推送节点,需先提工单升级调度资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 节点上下文参数:单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存查询昨日销售总额节点。

步骤三:配置数据推送节点

通过配置节点上下文参数本节点输入参数,来获取上游SQL查询节点输出的outputs参数,在正文中使用这些参数并将其推送至目标。

  1. 双击名为推送昨日销售总额的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性。

    调度周期

    image

    定时调度时间

    08:00

    说明

    确保数据能在08:00时推送至目标渠道。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    参数名 : inputs

    取值来源:选择上游节点查询昨日销售总额的输出参数outputs

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉与飞书。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉或飞书机器人的Webhook,需要在相应的目标平台上获取。

    • 标题推送昨日销售总额

    • 正文:按需求进行配置正文即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符进行获取上游输入的参数。

      image

  3. 配置完成后,单击image保存推送昨日销售总额推送节点。

步骤四:测试简单推送流程

在完成简单推送流程配置后,需要简单测试流程进行测试,以便后续提交发布。

  1. 双击打开数据推送Demo实例业务流程图页面。

  2. 选中查询昨日销售总额节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

提交与发布

在完成数据推送流程配置后,双击数据推送Demo示例业务流程图页面,测试所有的数据推送流程是否能正常运行,测试成功后,需要提交发布。

  1. 在数据推送流程的编辑页面,单击image,运行业务流程。

  2. 待数据推送流程中的所有节点后出现image,单击image提交运行成功的数据推送流程。

  3. 选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警

  4. 单击提交

  5. 提交成功后,即可在发布页面发布流程节点,详情可参见发布任务

后续步骤

数据推送流程将会随着配置的调度周期而运行,我们在运维页面也可以对已发布的数据推送流程进行各种的运维操作,详情请参见周期任务基本运维操作