通过SchedulerX实现调度与管理AI 脚本任务
本文介绍如何通过任务调度SchedulerX对LangChain脚本的管理和调度。
背景信息
LangChain是开源领域最流行的大模型编程开发框架,支持通过Python/JS语言快速构建AI 应用。
Dify是开源的图形化大模型应用开发平台,可以通过可视化的画布拖拽快速构建AI agent/工作流。
通过任务调度系统托管AI 任务,可以进行脚本版本管理、定时调度、提升资源利用率、限流控制、可运维、可观测。
脚本管理及调度
AI 任务涉及多个业务场景,需要进行定时调度。例如以下具体场景:
风险监控:每分钟扫描风险数据,通过大模型分析潜在风险事件,并及时发出报警。
数据分析:每日拉取金融数据,通过大模型进行数据分析,为投资者提供决策建议。
内容生成:每天自动总结工作内容,生成日报。
LangChain任务通常以Python脚本的形式实现,可以通过任务调度SchedulerX的脚本任务功能托管脚本,并进行定时配置。
任务调度SchedulerX支持脚本的历史版本,方便进行历史版本的对比和回滚。具体详情,请参见脚本历史版本。
Prompt管理
Prompt(提示词)在AI 任务中具有极其重要的作用,为了获得良好的输出效果,需要频繁修改调整Prompt。如果将Prompt直接写入脚本中,不仅会增加维护成本,还会降低灵活性,可以通过SchedulerX的任务参数功能来有效管理Prompt。在LangChain脚本中,利用SchedulerX提供的系统参数(#{schedulerx.jobParameters})
动态获取任务参数,代替Prompt或者PromptTemplate
参数实现灵活配置。
任务调度SchedulerX提供定时调度获取Prompt和API调度动态传递Prompt的两种方式管理Prompt。
定时调度获取Prompt
登录MSE SchedulerX控制台,左侧导航栏选择任务管理,并在顶部菜单栏选择地域。
创建脚本任务时编写脚本,或者对已创建任务编辑脚本。脚本内容如下:
Prompt写法
from langchain_community.llms import Tongyi from langchain.prompts import PromptTemplate from langchain.chains import LLMChain llm = Tongyi(model="qwen-plus") question = "#{schedulerx.jobParameters}" print("question:" + question) results = llm.invoke(question) print(results)
PromptTemplate写法
from langchain_community.llms import Tongyi from langchain.prompts import PromptTemplate from langchain.chains import LLMChain llm = Tongyi(model="qwen-plus") prompt = PromptTemplate(template="请帮我解答这个问题:{question}") chain = LLMChain(llm=llm, prompt=prompt) question = "#{schedulerx.jobParameters}" print("question:" + question) results = chain.invoke(question) print(results)
在创建任务面板的基本配置步骤中,配置相关参数,输入脚本任务参数。
API调度动态传递Prompt
SchedulerX也支持通过控制台手动运行或者API调度方式,动态设置新的Prompt。例如,以PromptTemplate写法为例,在通过控制台手动运行任务时,动态传递的任务参数将会覆盖任务配置中的静态任务参数。操作步骤如下:
在左侧导航栏选择任务管理,单击目标任务操作列的运行一次。
在弹出的确定运行任务对话框中,选择指定机器,配置实例参数后,单击确定。
提升资源利用率
SchedulerX执行脚本,当前支持两种模式:
脚本任务:在ECS上部署
schedulerx-agent
,每次执行fork一个子进程执行脚本,适合任务数比较多、调度频繁、资源消耗少的场景。K8s任务:在K8s上部署
schedulerx-agent
,每次执行弹一个Pod执行脚本,适合任务数不多、调度不频繁、资源消耗大的场景。
两种运行脚本模式适合不同的场景,结合使用可以提高资源利用率。
如上图所示,通过ECS执行脚本和通过K8s执行脚本的主要区别总结如下表所示:
ECS执行脚本 | K8s执行脚本 | |
如何安装依赖 | 将依赖提前手动部署在ECS上。 | 将依赖构建成基础镜像,如果依赖修改,需要重新构建基础镜像。 |
脚本调度频繁 | 推荐。每次fork一个子进程运行脚本,速度快。 | 不推荐。每次拉镜像起Pod,速度比较慢。 |
脚本不频繁调度 | 不推荐。ECS需要提前准备好资源,如果仅在一天内运行一次,资源利用率不高。 | 推荐。使用Pod执行任务,任务完成后Pod自动销毁,对于任务少并且调度不频繁的场景来说,成本更低。 |
脚本资源消耗小 | 推荐。fork子进程运行脚本,复用ECS的资源,成本低。 | 不推荐。每次执行都需要拉一个Pod执行,要占用额外的资源,成本高。 |
脚本消耗资源大 | 不推荐。超大任务,可能会把ECS资源打满,影响任务执行。 | 推荐。通过K8s的负载均衡策略,每次弹一个Pod运行脚本,稳定性高。 |
限流控制
业务场景描述:例如有一堆离线任务,需要每天0点之后执行,处理前一天的数据,核心任务必须在早上9点上班前全部完成。业务同学可能会把任务的调度时间都设置成同一时刻,如每天00:30执行。
当大量任务同时调度时,会把ECS资源打满。虽然通过K8s执行脚本可以解决一部分问题,但是突增的流量一样会把下游(比如数据库)打满。所以针对这种突增流量的场景,最佳解决方案是使用限流。通过限流控制解决定时调度不均,特别是突发流量的场景,是一种提升资源利用率的解决方案。
如上图所示,任务调度SchedulerX支持应用级别的限流控制:
每个应用会有2个队列,一个是优先级排队队列,可以把任务按照优先级在队列中排队,保证核心任务优先跑完。任务的优先级仅在自己的应用下生效,不会和其他应用产生冲突。
另一个是并发数队列,控制应用的并发数,不同应用的并发数彼此不受干扰。
当并发队列中某个任务运行完成,有空闲槽位后,会从排队队列头部取出任务,放到并发队列中,开始执行任务。
失败自动重试
在当前大模型调用稳定性不足的背景下,用户在与大模型交互时,可能会频繁遇到诸如Token限流或者后端服务异常等问题。这类问题通常具有临时性,通过稍后重新尝试往往能够解决。
任务调度SchedulerX提供任务失败自动重试功能,并支持通过控制台动态配置。经过实际验证,使用失败自动重试功能,LangChain脚本因后端大模型限流或者服务不可用导致的失败率显著降低,任务成功率提升了至少一个9。
依赖编排
任务调度SchedulerX提供可视化任务编排能力,如果您的LangChain脚本之间存在依赖关系,可以通过该功能轻松实现任务编排。即使是不同任务类型的任务,也能够统一进行编排处理。
如上图所示:
先通过Shell脚本,拉取大数据平台数据。
通过Java代码实现商家数据和用户数据的清洗。
通过LangChain实现对清洗后的数据进行大模型数据分析。
最后再通过Python脚本生成报表。
企业级可观测
任务调度SchedulerX默认集成了各种云产品,提供企业级可观测能力,包括但不限于如下功能。
调度大盘
调度大盘能够查看任务执行的整体情况,并支持根据命名空间和应用过滤筛选展示。
监控报警
如果任务执行失败,需要迅速进行响应处理,以避免故障的发生。任务调度SchedulerX支持应用级别的报警,能够精细到每个任务级别,下图所示为任务级别报警的配置示例。
日志服务
当任务执行失败时,需要查看任务运行日志以分析问题。只需接入schedulerx-agent
运行脚本,默认集成的日志服务将记录脚本运行的所有标准输出和异常。接入步骤内容如下:
左侧导航栏选择任务管理,并在顶部菜单栏选择地域。
单击创建任务,在创建任务面板的基本配置步骤中,配置相关参数。
脚本内容如下:
任务参数配置如下:
配置后续的定时配置和通知配置流程,完成接入
schedulerx-agent
运行脚本。在任务列表中,单击已创建任务操作列下的运行一次。
在左侧导航栏流程管理中,单击目标执行记录操作列下的日志,查看日志详情。
联系我们
如果您有AI 任务调度方面的其他诉求,欢迎联系我们,钉钉群号:23103656
。