执行器管理

更新时间:

创建完应用后,需要接入执行器才能执行任务。

普通应用接入执行器

自动注册

自动注册需要业务方依赖XXLJOBsdk,进行接入配置,应用启动成功就会自动注册。

1.查看接入配置

  1. 登录MSE XXL-JOB控制台

  2. 在页面左上角选择目标地域

  3. 单击目标实例ID,在左侧导航栏选择应用管理

  4. 单击目标应用执行器数量列下的接入按钮。

  5. 接入方式选择自动接入,并根据接入方式修改配置。

2.引入SDK接入执行器

Java SDK

  1. pom中引入“xxl-job-core”的maven依赖。

  2. 初始化执行器。

    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    
    }

Go SDK

  1. 执行以下命令,使用最新的tag拉取Go版本的XXLJOB SDK。

    go get github.com/xxl-job/xxl-job-executor-go@{最新的tag}
  2. 编写业务代码。

    package main
    
    import (
        "context"
        "fmt"
        xxl "github.com/xxl-job/xxl-job-executor-go"
        "github.com/xxl-job/xxl-job-executor-go/example/task"
        "log"
    )
    
    func main() {
        exec := xxl.NewExecutor(
            xxl.ServerAddr("xxxxxx"),       //请求地址,控制台应用管理接入配置获取
            xxl.AccessToken("xxxxxxx"),     //请求令牌,控制台应用管理接入配置获取
            xxl.ExecutorPort("9999"),       //默认9999(非必填)
            xxl.RegistryKey("golang-jobs"), //执行器名称
            xxl.SetLogger(&logger{}),       //自定义日志
        )
        exec.Init()
        exec.Use(customMiddleware)
        //设置日志查看handler
        exec.LogHandler(customLogHandle)
        //注册任务handler
        exec.RegTask("task.test", task.Test)
        exec.RegTask("task.test2", task.Test2)
        exec.RegTask("task.panic", task.Panic)
        log.Fatal(exec.Run())
    }
    
    // 自定义日志处理器
    func customLogHandle(req *xxl.LogReq) *xxl.LogRes {
        return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{
            FromLineNum: req.FromLineNum,
            ToLineNum:   2,
            LogContent:  "这个是自定义日志handler",
            IsEnd:       true,
        }}
    }
    
    // xxl.Logger接口实现
    type logger struct{}
    
    func (l *logger) Info(format string, a ...interface{}) {
        fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))
    }
    
    func (l *logger) Error(format string, a ...interface{}) {
        log.Println(fmt.Sprintf("自定义日志 - "+format, a...))
    }
    
    // 自定义中间件
    func customMiddleware(tf xxl.TaskFunc) xxl.TaskFunc {
        return func(cxt context.Context, param *xxl.RunReq) string {
            log.Println("I am a middleware start")
            res := tf(cxt, param)
            log.Println("I am a middleware end")
            return res
        }
    }
    

Python SDK

  1. 拉取依赖。

    pip install pyxxl
    
    # 如果日志需要写入redis
    pip install "pyxxl[redis]"
    
    # 如果需要从.env加载配置
    pip install "pyxxl[dotenv]"
    
    # 安装所有功能
    pip install "pyxxl[all]"
  2. 编写业务代码。

    import asyncio
    import time
    
    from pyxxl import ExecutorConfig, PyxxlRunner
    from pyxxl.ctx import g
    
    config = ExecutorConfig(
        xxl_admin_baseurl="http://xxljob-1b3fd8196eb.schedulerx.mse.aliyuncs.com/api/",
        executor_app_name="xueren-test",
        access_token="default_token",
    #    executor_listen_host="0.0.0.0",  # 如果xxl-admin可以直连executorip,可以不填写executor_listen_host
    )
    
    app = PyxxlRunner(config)
    
    
    @app.register(name="demoJobHandler")
    async def test_task():
        # you can get task params with "g"
        g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams)
        for i in range(10):
            g.logger.warning("test logger %s" % i)
        await asyncio.sleep(5)
        return "成功..."
    
    @app.register(name="sync_func")
    def test_task4():
        # 如果要在xxl-admin上看到执行日志,打印日志的时候务必用g.logger来打印,默认只打印info及以上的日志
        n = 1
        g.logger.info("Job %s get executor params: %s" % (g.xxl_run_data.jobId, g.xxl_run_data.executorParams))
        # 如果同步任务里面有循环,为了支持cancel操作,必须每次都判断g.cancel_event.
        while n <= 10 and not g.cancel_event.is_set():
            # 如果不需要从xxl-admin中查看日志,可以用自己的logger
            g.logger.info(
                "log to {} logger test_task4.{},params:{}".format(
                    g.xxl_run_data.jobId,
                    n,
                    g.xxl_run_data.executorParams,
                )
            )
            time.sleep(2)
            n += 1
        return "成功3"
    
    
    if __name__ == "__main__":
        app.run_executor()

手动录入

手动录入可以人工维护执行器的地址信息,地址格式参考http://192.168.0.1:9999/

  1. 登录MSE XXL-JOB控制台

  2. 在页面左上角选择目标地域

  3. 单击目标实例ID,在左侧导航栏选择应用管理

  4. 单击目标应用执行器数量列下的接入按钮。

  5. 接入方式选择手动录入,并输入执行器地址

HTTP应用接入执行器

不需要SDK接入,通过配置域名或K8s服务自动发现后端节点,使用HTTP协议调度。

接入K8s服务

如果HTTP应用部署在阿里云容器服务ACK中,推荐使用“接入K8s服务”方式。

  1. (若已完成可跳过)将应用部署在阿里云容器服务ACK上,支持如下集群类型:

    • ACK托管集群,网络插件为Terway

    • ACK Serverless集群

    • ACS集群

  2. (若已完成可跳过)在ACK中为应用创建Service,支持服务类型:

    • ClusterIP

    • LoadBalancer

  3. 登录MSE XXL-JOB控制台

  4. 在页面左上角选择目标地域

  5. 单击目标实例ID,在左侧导航栏选择应用管理

  6. 单击目标应用执行器数量列下的接入按钮。

  7. 接入执行器页面,接入方式选择接入K8s服务,并完成接入K8s服务参数配置,单击确定完成接入。

接入成功后,执行期数量会发生改变,点击查看,可以看到后端Pod列表。

手动录入域名

如果应用没有部署在ACK上(比如使用ECS部署),也可以通过内网域名调度HTTP任务。

  1. (若已完成可跳过)为HTTP应用创建一个网关,会自动生成一个内网域名,比如使用NLB。

  2. 在页面左上角选择目标地域

  3. 单击目标实例ID,在左侧导航栏选择应用管理

  4. 单击目标应用执行器数量列下的接入按钮。

  5. 接入执行器页面,接入方式选择录入域名,并配置内网域名。