本文以两个典型应用场景介绍跨节点参数的使用方法。
应用场景
跨节点参数有两个典型的应用场景:
某金融企业每天的周期任务涉及到币种转换,需要将每日的投资收益按照美元进行结算。每日的收盘汇率会从业务系统中提取并同步至一个离线表。多个周期任务均涉及币种换算,这需要使用业务日期相应的汇率。可以将读取汇率的程序编写为一个输出节点任务,将汇率作为跨节点参数输出,需要汇率的节点任务则作为输入节点来引用该跨节点参数。
某经销零售企业,需要每天定时查询供应商的某个服务,以确认产品目录是否有变更,如有变更,则需要全量拉取最新的目录数据(由于数据量较大,拉取一次的成本较高);若未发生变更,则继续使用上一次拉取的数据。可以将检测产品目录是否变更的程序写入至一个输出节点任务中,将产品目录的更新状态作为跨节点参数输出,同时将拉取并同步产品目录的节点设为输入节点,根据跨节点参数的取值进行调度(条件调度)。
使用示例一:汇率转换
数据结构
汇率表exchange_rate_table
,表结构和示例数据如下。
输出节点
创建MAX_COMPUTE_SQL任务
get_exchange_rate
,并定义跨节点输出参数usd2cny
。在任务编辑器中,单击鼠标右键,选择设置跨节点参数。
任务编辑器将自动提示已声明定义好的跨节点输出参数。
将某一字段的别名设置为跨节点输出参数的名称,系统将自动将查询结果第一行相应字段的取值赋值给跨节点参数。
说明对于SQL任务,如果有多个语句输出跨节点参数,则每一个语句前的
set
语句不可省略。本例有两种SQL写法:
每一个跨节点参数一个独立的 SQL。
使用一个语句。
提交任务。
输入节点
创建SQL任务
exchange_usd_to_cny
,示例代码如下。同时,将输出节点get_exchange_rate
添加为上游。-- 10000 美金换算为人民币,日元,欧元,澳元和港币 select 10000 * ${usd2cny_rate}, 10000 * ${usd2jpy_rate}, 10000 * ${usd2eur_rate}, 10000 * ${usd2aud_rate}, 10000 * ${usd2hkd_rate};
将识别出的变量参数的类型修改为跨节点变量。
在各个跨节点变量的参数值中选择
get_exchange_rate
的相应跨节点输出参数
补数据
确认表中当日的汇率。
进入运维中心,在当前任务的
get_exchange_rate
节点中单击补数据,选择补当前及上下游任务,选中下游exchange_usd_to_cny
一起补数据。说明对输入节点补数据时,必须连带补输出节点,否则输入节点将使用跨节点参数的默认值。
exchange_usd_to_cny
的运行日志如下,可以看到系统从表中将数据读取出来赋值给跨节点输出参数,并传递给了下游任务。
示例二:确认更新状态
输出节点
创建一个模拟的检测更新状态的Python任务,并添加跨节点输出参数
update_status
。
输入代码,使用随机函数来返回状态。单击鼠标右键,选择设置跨节点参数后,提交任务。
from random import randint def check_update(): return randint(0, 1) setv("update_status", check_update())
输入节点
创建一个离线管道任务
imp_product_catalog
,并将check_update
作为其上游任务添加。
为
imp_product_catalog
任务开启条件调度。
条件调度中添加条件
跨节点参数-check_update.update_status = 0
(即无更新)时,空跑调度,不符合该条件则命中默认条件(即正常调度)。
输入不同的跨节点参数值,预览调度运行计划。