全部产品
云市场

基于消息的定时任务

更新时间:2019-11-15 10:43:32

基于消息的定时任务仅对存量老用户开放,新用户推荐使用基于 RPC 的定时任务。

定时任务服务旨在为业务系统提供统一通用的任务调度服务,提供定时任务的管理监控平台,减轻业务系统开发和后续线上运维的工作量,并通过任务拆分和负载均衡等方案提升大数据量任务的性能。

功能和目标

  • 提供统一的定时任务注册和管理监控平台
  • 提供集中的定时调度、消息通知
  • 通过任务拆分、调度数据托管提升大数据量任务的性能
  • 提供特殊时段(停机维护)的支持
  • 提供便捷的测试支持

核心场景
定时任务依赖于消息队列,定时任务服务端按照用户配置的定时任务信息,到了任务执行时间就向应用发送一个消息。应用接收到消息后,开始执行自己预设的任务逻辑。

部署图
task

  1. 开发人员和管理员在界面上配置管理定时任务。
  2. 调度系统将任务元数据固化到数据库,按照配置参数定时发送消息。
  3. 业务集群系统接收经由消息队列发送的消息后,执行实际的业务逻辑,完成定时触发的效果。

开始使用基于消息的定时任务

基于消息的定时任务依赖消息队列(Message Queue)。在使用基于消息的定时任务之前,您必须先开通消息队列服务。

使用定时任务中间件实现定时任务功能的流程为:

  1. 开发编码
  2. 发布应用
  3. 配置定时任务
  4. 配置消息类型及订阅关系

前置条件

  • 您已经准备好一个 SOFABoot 工程,推荐升级到最新版本。
  • 为保障中间件的安全性,所有的调用均需要验证访问者的身份,安全配置请参考 引入中间件

开发编码

定时任务服务依赖消息队列实现定时任务功能。本地编码主要是完成订阅端代码(监听消息完成任务逻辑)。具体步骤介绍参见 消息队列 > 快速开始

消息订阅端代码

  1. public class SchedulerDemo implements UniformEventMessageListener {
  2. /** logger */
  3. private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerDemo.class);
  4. @Override
  5. public void onUniformEvent(UniformEvent uniformEvent, UniformEventContext uniformEventContext)
  6. throws Exception {
  7. final String topic = uniformEvent.getTopic();
  8. final String eventCode = uniformEvent.getEventCode();
  9. // 接收触发后的定时业务处理
  10. LOGGER.info("[Receive an uniformEvent] topic {} eventcode {} eventId {} payload {}",
  11. new Object[] { topic, eventCode, uniformEvent.getId()});
  12. }
  13. }

消息订阅端配置

一条典型的消息队列消息包括 TOPIC 和 EVENTCODE。定时任务发送的消息 TOPIC 统一为 TP_F_SC,开发时需要确定一个 EventCode(以 EC_ 开头),以及消息订阅组 Group(格式为:S_appname_service)。

  1. <!-- consumer declaration, the id and group attribute are required and their value must be unique -->
  2. <sofa:consumer id="uniformEventSubscriber" group="S_schedulertutorial_demo">
  3. <sofa:listener ref="schedulerDemo"/>
  4. <sofa:channels>
  5. <!-- channel value is the involved topic -->
  6. <sofa:channel value="TP_F_SC">
  7. <!-- each event represents a subscription -->
  8. <sofa:event eventType="direct" eventCode="EC_TASK_SCHEDULERTUTORIAL_DEMO" persistence="false"/>
  9. </sofa:channel>
  10. </sofa:channels>
  11. <sofa:binding.msg_broker/>
  12. </sofa:consumer>
  13. <!-- messageListener listener bean declaration, implements com.alipay.common.event.UniformEventMessageListener -->
  14. <bean id="schedulerDemo" class="com.antcloud.tutorial.scheduler.SchedulerDemo"/>

消息超时

消息队列的消息超过 8 秒会重试,会存在部分定时任务执行时间超过 8 秒的情况。对于这种情况,需要使用线程池,启用异步线程执行任务处理,然后直接 return,告诉消息队列已经完成消息处理,避免超时重试。

示例如下:

  1. <!-- 异步任务使用的线程池 -->
  2. <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
  3. <property name="corePoolSize" value="10"/>
  4. <property name="maxPoolSize" value="120"/>
  5. <property name="queueCapacity" value="200"/>
  6. </bean>
  7. <!-- 具体参数请根据业务特征自行配置 -->
  8. //然后在您的消息监听器中启用新线程去处理任务:
  9. threadPool.execute(new Runnable() {
  10. public void run() {
  11. //执行任务逻辑
  12. }
  13. });

发布应用

将应用发布至云端,SOFABoot 应用的发布参见 SOFABoot 快速开始

配置定时任务

配置入口

登录微服务平台后,选择 微服务 > 定时任务 > 基于消息的任务,即可开始使用定时任务服务。

配置任务

在定时任务界面中,点击页面上方的 新增定时任务 按钮,按提示输入任务信息,点击 确定

新增定时任务

  • 任务名称:命名格式为 EC_TASK_SYSTEM_FUNCTION,注意需要和实际代码中的配置保持一致。
  • 系统名:填写发布的应用名。
  • CRON 表达式:调度中心使用 Quartz 来实现定时执行,配置规则可参见 Quartz 官网

新增完成之后,定时任务就按照预期的频率开始定时执行了,所以建议开发联调初期,将 CRON 表达式设置为 0 0 0 * * ? 这种低频的形式,待开发完成后再调整为预期频率的自动执行。

新增完成后,还可以对任务进行其他的操作:

  • 开/关:界面显示为 时,任务会自动执行,点击后状态变更为 ,任务停止自动执行。
  • 触发:手工在界面进行一次触发,任务就会在后台触发一次。
  • 编辑:调整任务名称、CRON、系统。
  • 删除:删除某个定时任务。

另外,由于定时任务的开/关、修改,涉及到在后台内存中的实际生效,每次生效执行完成需要耗时 30 秒 ~ 60 秒,在每次这类操作生效之前,下次这类操作无法开始执行。

application.properties

每个工作空间需要连接的服务端不一样,参见 SOFARPC 进阶指南 > RPC 引用服务 完成对应工作空间的 application.properties 参数调整:

  • com.alipay.env
  • com.alipay.instanceid
  • com.antcloud.antvip.endpoint

配置消息类型及订阅关系

在云端控制台配置消息主题、消费组及订阅关系,参见 消息队列 > 快速开始

注意

  • 消息主题:对于来自定时任务的消息,统一填写:TP_F_SC
  • 消息分类码:对应定时任务中配置的任务名称,需要和实际代码中的配置一致。
  • 消息消费组:需要和实际代码中的配置一致。

配置消息类型、订阅关系后记得将其生效。到了设定的任务执行时间后,任务就会被执行。

注意事项

使用定时任务服务时,需要注意以下事项:

  • 组件依赖 MQ(Message Queue,消息队列):需要先开通消息队列;
  • 单台执行:正常情况下,一次任务触发只会有一台机器执行,不会全集群同时执行;
  • 秒级触发:CRON 表达式只能精确到秒级,无法完成毫秒级的触发;
  • 幂等性控制:使用消息完成交互,要考虑异常场景下消息重投的可能性,做好幂等控制;
  • 超时:消息会超时重试 8 s,如果业务处理耗时很长,建议在 UniformEventMessageListener 中接受消息后直接返回成功,使用异步线程池,异步完成实际业务处理。