通过SchedulerX实现调度与管理AI 脚本任务

更新时间:

本文介绍如何通过任务调度SchedulerXLangChain脚本的管理和调度。

背景信息

  • LangChain是开源领域最流行的大模型编程开发框架,支持通过Python/JS语言快速构建AI 应用。

  • Dify是开源的图形化大模型应用开发平台,可以通过可视化的画布拖拽快速构建AI agent/工作流。

通过任务调度系统托管AI 任务,可以进行脚本版本管理、定时调度、提升资源利用率、限流控制、可运维、可观测。

image

脚本管理及调度

AI 任务涉及多个业务场景,需要进行定时调度。例如以下具体场景:

  • 风险监控:每分钟扫描风险数据,通过大模型分析潜在风险事件,并及时发出报警。

  • 数据分析:每日拉取金融数据,通过大模型进行数据分析,为投资者提供决策建议。

  • 内容生成:每天自动总结工作内容,生成日报。

LangChain任务通常以Python脚本的形式实现,可以通过任务调度SchedulerX脚本任务功能托管脚本,并进行定时配置。

image

任务调度SchedulerX支持脚本的历史版本,方便进行历史版本的对比和回滚。具体详情,请参见脚本历史版本

image

Prompt管理

Prompt(提示词)在AI 任务中具有极其重要的作用,为了获得良好的输出效果,需要频繁修改调整Prompt。如果将Prompt直接写入脚本中,不仅会增加维护成本,还会降低灵活性,可以通过SchedulerX的任务参数功能来有效管理Prompt。在LangChain脚本中,利用SchedulerX提供的系统参数(#{schedulerx.jobParameters})动态获取任务参数,代替Prompt或者PromptTemplate参数实现灵活配置。

任务调度SchedulerX提供定时调度获取PromptAPI调度动态传递Prompt的两种方式管理Prompt。

定时调度获取Prompt

  1. 登录MSE SchedulerX控制台,左侧导航栏选择任务管理,并在顶部菜单栏选择地域。

  2. 创建脚本任务时编写脚本,或者对已创建任务编辑脚本。脚本内容如下:

    Prompt写法

    from langchain_community.llms import Tongyi
    from langchain.prompts import PromptTemplate
    from langchain.chains import LLMChain
    
    llm = Tongyi(model="qwen-plus")
    question = "#{schedulerx.jobParameters}"
    print("question:" + question)
    
    results = llm.invoke(question)
    print(results)

    PromptTemplate写法

    from langchain_community.llms import Tongyi
    from langchain.prompts import PromptTemplate
    from langchain.chains import LLMChain
    
    llm = Tongyi(model="qwen-plus")
    prompt = PromptTemplate(template="请帮我解答这个问题:{question}")
    
    chain = LLMChain(llm=llm, prompt=prompt)
    question = "#{schedulerx.jobParameters}"
    print("question:" + question)
    
    results = chain.invoke(question)
    print(results)
  3. 创建任务面板的基本配置步骤中,配置相关参数,输入脚本任务参数

    image

API调度动态传递Prompt

SchedulerX也支持通过控制台手动运行或者API调度方式,动态设置新的Prompt。例如,以PromptTemplate写法为例,在通过控制台手动运行任务时,动态传递的任务参数将会覆盖任务配置中的静态任务参数。操作步骤如下:

  1. 在左侧导航栏选择任务管理,单击目标任务操作列的运行一次

  2. 在弹出的确定运行任务对话框中,选择指定机器,配置实例参数后,单击确定

    image

提升资源利用率

SchedulerX执行脚本,当前支持两种模式:

  • 脚本任务:在ECS上部署schedulerx-agent,每次执行fork一个子进程执行脚本,适合任务数比较多、调度频繁、资源消耗少的场景。

  • K8s任务:在K8s上部署schedulerx-agent,每次执行弹一个Pod执行脚本,适合任务数不多、调度不频繁、资源消耗大的场景。

两种运行脚本模式适合不同的场景,结合使用可以提高资源利用率。

image

如上图所示,通过ECS执行脚本和通过K8s执行脚本的主要区别总结如下表所示:

ECS执行脚本

K8s执行脚本

如何安装依赖

将依赖提前手动部署在ECS上。

将依赖构建成基础镜像,如果依赖修改,需要重新构建基础镜像。

脚本调度频繁

推荐。每次fork一个子进程运行脚本,速度快。

不推荐。每次拉镜像起Pod,速度比较慢。

脚本不频繁调度

不推荐。ECS需要提前准备好资源,如果仅在一天内运行一次,资源利用率不高。

推荐。使用Pod执行任务,任务完成后Pod自动销毁,对于任务少并且调度不频繁的场景来说,成本更低。

脚本资源消耗小

推荐。fork子进程运行脚本,复用ECS的资源,成本低。

不推荐。每次执行都需要拉一个Pod执行,要占用额外的资源,成本高。

脚本消耗资源大

不推荐。超大任务,可能会把ECS资源打满,影响任务执行。

推荐。通过K8s的负载均衡策略,每次弹一个Pod运行脚本,稳定性高。

限流控制

业务场景描述:例如有一堆离线任务,需要每天0点之后执行,处理前一天的数据,核心任务必须在早上9点上班前全部完成。业务同学可能会把任务的调度时间都设置成同一时刻,如每天00:30执行。

当大量任务同时调度时,会把ECS资源打满。虽然通过K8s执行脚本可以解决一部分问题,但是突增的流量一样会把下游(比如数据库)打满。所以针对这种突增流量的场景,最佳解决方案是使用限流。通过限流控制解决定时调度不均,特别是突发流量的场景,是一种提升资源利用率的解决方案。

image

如上图所示,任务调度SchedulerX支持应用级别的限流控制:

  1. 每个应用会有2个队列,一个是优先级排队队列,可以把任务按照优先级在队列中排队,保证核心任务优先跑完。任务的优先级仅在自己的应用下生效,不会和其他应用产生冲突。

  2. 另一个是并发数队列,控制应用的并发数,不同应用的并发数彼此不受干扰。

  3. 当并发队列中某个任务运行完成,有空闲槽位后,会从排队队列头部取出任务,放到并发队列中,开始执行任务。

失败自动重试

在当前大模型调用稳定性不足的背景下,用户在与大模型交互时,可能会频繁遇到诸如Token限流或者后端服务异常等问题。这类问题通常具有临时性,通过稍后重新尝试往往能够解决。

任务调度SchedulerX提供任务失败自动重试功能,并支持通过控制台动态配置。经过实际验证,使用失败自动重试功能,LangChain脚本因后端大模型限流或者服务不可用导致的失败率显著降低,任务成功率提升了至少一个9。

image

依赖编排

任务调度SchedulerX提供可视化任务编排能力,如果您的LangChain脚本之间存在依赖关系,可以通过该功能轻松实现任务编排。即使是不同任务类型的任务,也能够统一进行编排处理。

image

如上图所示:

  1. 先通过Shell脚本,拉取大数据平台数据。

  2. 通过Java代码实现商家数据和用户数据的清洗。

  3. 通过LangChain实现对清洗后的数据进行大模型数据分析。

  4. 最后再通过Python脚本生成报表。

企业级可观测

任务调度SchedulerX默认集成了各种云产品,提供企业级可观测能力,包括但不限于如下功能。

调度大盘

调度大盘能够查看任务执行的整体情况,并支持根据命名空间和应用过滤筛选展示。

image

监控报警

如果任务执行失败,需要迅速进行响应处理,以避免故障的发生。任务调度SchedulerX支持应用级别的报警,能够精细到每个任务级别,下图所示为任务级别报警的配置示例。

image

日志服务

当任务执行失败时,需要查看任务运行日志以分析问题。只需接入schedulerx-agent运行脚本,默认集成的日志服务将记录脚本运行的所有标准输出和异常。接入步骤内容如下:

  1. 左侧导航栏选择任务管理,并在顶部菜单栏选择地域。

  2. 单击创建任务,在创建任务面板的基本配置步骤中,配置相关参数。

    脚本内容如下:

    image

    任务参数配置如下:

    image

  3. 配置后续的定时配置通知配置流程,完成接入schedulerx-agent运行脚本。

  4. 在任务列表中,单击已创建任务操作列下的运行一次

  5. 在左侧导航栏流程管理中,单击目标执行记录操作列下的日志,查看日志详情。

    image

联系我们

如果您有AI 任务调度方面的其他诉求,欢迎联系我们,钉钉群号:23103656