场景
某经销零售企业需要每天定时查询供应商的某项服务,来确认产品目录是否存在变更。如果存在变更,则需全量拉取最新的目录数据(数据量较大,拉取一次的成本相对较高);若无变更,则继续使用上一次拉取的数据。在Dataphin中应如何实现这一过程?
解决方案及功能
可以将检测产品目录是否变更的程序写入一个输出节点任务,将产品目录更新状态作为跨节点参数输出,拉取同步产品目录的节点作为输入节点,基于跨节点参数的取值来调度(条件调度)。
输出节点
前往研发 > 数据开发 > 计算任务中,创建检测产品目录是否变更的Python任务,并添加跨节点输出参数
update_status。
输入Python任务代码,使用随机函数来返回状态(仅作针对此示例,实际请根据产品目录是否变更来返回状态)。代码编写完成后,提交任务。
from random import randint def check_update(): return randint(0,1) stev("update_status",check_update()) #跨节点参数
输入节点
创建一个离线管道任务
imp_product_catalog,并将check_update任务添加为其上游任务。
开启条件调度,调度条件为check_update.update_status = 0 (无更新)时,空跑调度。配置完成后可预览调度运行计划,若预览值为0,则均为空跑调度;若预览值为1,则均为正常调度。
此时即可根据上游节点提供的跨节点参数的状态结合条件调度,动态调整下游节点的调度运行计划。

该文章对您有帮助吗?