执行器管理

更新时间:
复制为 MD 格式

创建完应用后,需要接入执行器才能执行任务。

普通应用接入执行器

高代码方式通过SDK接入

高代码方式通过集成XXL-JOB SDK,开发者需在代码中实现任务处理逻辑,并注册对应的Bean名称,控制台仅需配置该Bean名称即可触发执行,适用于复杂业务逻辑场景,支持Java、GoPython。

支持自动注册与手动录入。

自动注册

自动注册需要业务方依赖XXL-JOB SDK,进行接入配置,应用启动成功将自动注册。

查看接入配置
  1. 登录MSE XXL-JOB控制台

  2. 在页面左上角选择目标地域

  3. 单击目标实例ID,在左侧导航栏选择应用管理

  4. 单击目标应用执行器数量列下的接入按钮。

  5. 接入方式选择自动接入,并根据接入方式修改配置。

引入SDK接入执行器
Java SDK
  1. pom中引入“xxl-job-core”的maven依赖。

  2. 初始化执行器。

    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    
    }
Go SDK
  1. 执行以下命令,使用最新的tag拉取Go版本的XXLJOB SDK。

    go get github.com/xxl-job/xxl-job-executor-go@{最新的tag}
  2. 编写业务代码。

    package main
    
    import (
        "context"
        "fmt"
        xxl "github.com/xxl-job/xxl-job-executor-go"
        "github.com/xxl-job/xxl-job-executor-go/example/task"
        "log"
    )
    
    func main() {
        exec := xxl.NewExecutor(
            xxl.ServerAddr("xxxxxx"),       //请求地址,控制台应用管理接入配置获取
            xxl.AccessToken("xxxxxxx"),     //请求令牌,控制台应用管理接入配置获取
            xxl.ExecutorPort("9999"),       //默认9999(非必填)
            xxl.RegistryKey("golang-jobs"), //执行器名称
            xxl.SetLogger(&logger{}),       //自定义日志
        )
        exec.Init()
        exec.Use(customMiddleware)
        //设置日志查看handler
        exec.LogHandler(customLogHandle)
        //注册任务handler
        exec.RegTask("task.test", task.Test)
        exec.RegTask("task.test2", task.Test2)
        exec.RegTask("task.panic", task.Panic)
        log.Fatal(exec.Run())
    }
    
    // 自定义日志处理器
    func customLogHandle(req *xxl.LogReq) *xxl.LogRes {
        return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{
            FromLineNum: req.FromLineNum,
            ToLineNum:   2,
            LogContent:  "这个是自定义日志handler",
            IsEnd:       true,
        }}
    }
    
    // xxl.Logger接口实现
    type logger struct{}
    
    func (l *logger) Info(format string, a ...interface{}) {
        fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))
    }
    
    func (l *logger) Error(format string, a ...interface{}) {
        log.Println(fmt.Sprintf("自定义日志 - "+format, a...))
    }
    
    // 自定义中间件
    func customMiddleware(tf xxl.TaskFunc) xxl.TaskFunc {
        return func(cxt context.Context, param *xxl.RunReq) string {
            log.Println("I am a middleware start")
            res := tf(cxt, param)
            log.Println("I am a middleware end")
            return res
        }
    }
    
Python SDK
  1. 拉取依赖。

    pip install pyxxl
    
    # 如果日志需要写入redis
    pip install "pyxxl[redis]"
    
    # 如果需要从.env加载配置
    pip install "pyxxl[dotenv]"
    
    # 安装所有功能
    pip install "pyxxl[all]"
  2. 编写业务代码。

    import asyncio
    import time
    
    from pyxxl import ExecutorConfig, PyxxlRunner
    from pyxxl.ctx import g
    
    config = ExecutorConfig(
        xxl_admin_baseurl="http://xxljob-1b3fd8196eb.schedulerx.mse.aliyuncs.com/api/",
        executor_app_name="xueren-test",
        access_token="default_token",
    #    executor_listen_host="0.0.0.0",  # 如果xxl-admin可以直连executorip,可以不填写executor_listen_host
    )
    
    app = PyxxlRunner(config)
    
    
    @app.register(name="demoJobHandler")
    async def test_task():
        # you can get task params with "g"
        g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams)
        for i in range(10):
            g.logger.warning("test logger %s" % i)
        await asyncio.sleep(5)
        return "成功..."
    
    @app.register(name="sync_func")
    def test_task4():
        # 如果要在xxl-admin上看到执行日志,打印日志的时候务必用g.logger来打印,默认只打印info及以上的日志
        n = 1
        g.logger.info("Job %s get executor params: %s" % (g.xxl_run_data.jobId, g.xxl_run_data.executorParams))
        # 如果同步任务里面有循环,为了支持cancel操作,必须每次都判断g.cancel_event.
        while n <= 10 and not g.cancel_event.is_set():
            # 如果不需要从xxl-admin中查看日志,可以用自己的logger
            g.logger.info(
                "log to {} logger test_task4.{},params:{}".format(
                    g.xxl_run_data.jobId,
                    n,
                    g.xxl_run_data.executorParams,
                )
            )
            time.sleep(2)
            n += 1
        return "成功3"
    
    
    if __name__ == "__main__":
        app.run_executor()

手动录入

手动录入可以人工维护执行器的地址信息,地址格式参考http://192.168.0.1:9999/

  1. 登录MSE XXL-JOB控制台

  2. 在页面左上角选择目标地域

  3. 单击目标实例ID,在左侧导航栏选择应用管理

  4. 单击目标应用执行器数量列下的接入按钮。

  5. 接入方式选择手动录入,并输入执行器地址

低代码方式通过Agent接入

低代码方式通过手动部署Agent接入执行器,用户可在控制台直接编写业务逻辑,适用于脚本任务、大数据任务、SQL任务及AI任务等场景。

支持通过安装包部署、通过Docker启动以及通过Kubernetes启动。

安装包部署

部署前提

已安装JDK17及以上版本。

  1. 下载安装包

    wget https://schedulerx3.oss-cn-hangzhou.aliyuncs.com/xxljob/schedulerx3-agent-1.0.0-bin.tar.gz
  2. 解压与配置:

    # 解压
    tar -zxvf schedulerx3-agent-1.0.0-bin.tar.gz
    cd schedulerx3-agent-1.0.0-bin

    解压后的目录结构:

    schedulerx3-agent-1.0.0-bin/
    ├── bin/              # 启动脚本目录
    ├── conf/             # 配置文件目录
    │   ├── application.yml      # 应用配置
    │   └── logback-spring.xml   # 日志配置
    ├── lib/              # 依赖 jar 包目录
    └── logs/             # 日志目录(运行时自动创建)
        ├── stdout.log    # 标准输出日志
        ├── stderr.log    # 标准错误日志
        ├── worker.log    # 应用日志
        ├── error.log     # 错误日志
        ├── gc.log        # GC 日志
        └── archive/      # 归档日志目录

    编辑配置文件conf/application.yml,根据XXL-JOB实例配置如下参数:

    xxl:
      job:
        admin-addresses: {服务接入地址}
        access-token: {应用AccessToken}
        executor:
          appname: {应用AppName}
  3. 启动服务

    • Linux/Mac

      # 后台启动
      ./bin/start.sh
      
      # 前台启动(调试)
      ./bin/start.sh -f
      
      # 停止
      ./bin/stop.sh
      
      # 重启
      ./bin/restart.sh
      
      # 查看状态
      ./bin/status.sh
      
      # 查看日志
      tail -f logs/worker.log
    • Windows

      REM 后台启动
      .\bin\start.cmd
      
      REM 前台启动(调试)
      .\bin\start.cmd -f
      
      REM 停止
      .\bin\stop.cmd
      
      REM 重启
      .\bin\restart.cmd
      
      REM 查看状态
      .\bin\status.cmd
      
      REM 查看日志
      type logs\worker.log
  4. (可选)日志配置

    • 主要日志文件位于logs/目录,任务执行日志默认位于${user.home}/applogs/xxl-job/jobhandler

      日志文件

      说明

      滚动策略

      stdout.log

      标准输出日志(启动日志)

      脚本重定向

      stderr.log

      标准错误日志(异常堆栈)

      脚本重定向

      worker.log

      应用日志(INFO 及以上)

      100MB/文件,保留 30 天

      error.log

      错误日志(ERROR 级别)

      50MB/文件,保留 60 天

      gc.log

      GC 日志

      JVM 参数配置

      heap_dump.hprof

      堆转储文件(OOM 时生成)

      -

      archive/

      归档日志目录(自动压缩 .gz)

      -

    • 编辑conf/logback-spring.xml可调整日志的打印输出。

      <!-- 修改根日志级别 -->
      <root level="INFO">
          <appender-ref ref="STDOUT" />
          <appender-ref ref="FILE" />
      </root>
      <!-- 修改特定包的日志级别 -->
      <logger name="com.aliyun.schedulerx" level="DEBUG" />
      <logger name="com.xxl.job" level="DEBUG" />
  5. (可选)JVM参数配置,根据实际负载调整 JVM 内存大小。

    # Linux/Mac - 临时指定
    JAVA_OPTS="-Xms2g -Xmx4g" ./bin/start.sh
    
    # Linux/Mac - 永久修改
    vim bin/start.sh  # 编辑 JAVA_OPTS 变量
    
    # Windows - 临时指定
    set JAVA_OPTS=-Xms2g -Xmx4g
    .\bin\start.cmd
    
    # Windows - 永久修改
    notepad bin\start.cmd  # 编辑 JAVA_OPTS 变量

通过Docker启动

方式一:使用公开镜像部署

公共镜像提供了常规脚本的运行能力,系统预装了python、nodejs、go环境。可直接从镜像仓库拉取并运行,无需构建。

# 拉取镜像
docker pull schedulerx-registry.cn-hangzhou.cr.aliyuncs.com/schedulerx3/schedulerx3-agent:1.0.0

# 自定义配置运行
docker run -d \
  --name schedulerx3-agent \
  -p 9999:9999 \
  // 按需配置JVM参数
  -e JAVA_OPTS="-Xms1g -Xmx2g" \
  -e SCHEDULERX3_ADMIN_ADDRESSES="{服务接入地址}" \
  -e SCHEDULERX3_EXECUTOR_APPNAME="{应用AppName}" \
  -e SCHEDULERX3_ACCESS_TOKEN="{应用AccessToken}" \
  -v $(pwd)/logs:/opt/schedulerx3-agent/logs \
  --restart unless-stopped \
  schedulerx-registry.cn-hangzhou.cr.aliyuncs.com/schedulerx3/schedulerx3-agent:1.0.0
方式二:基于tar包自行构建镜像

业务有额外的外部组件依赖或自定义基础镜像,可基于下载的 tar 包自行构建镜像,并发布到自有的镜像仓库。

# 下载安装包
wget https://schedulerx3.oss-cn-hangzhou.aliyuncs.com/xxljob/schedulerx3-agent-1.0.0-bin.tar.gz
# Docker 镜像构建
docker build -t schedulerx3-agent:1.0.0 -f Dockerfile .

对应Dockerfile参考如下:

############################################
### 当前Dockerfile根据各自业务需要安装所需的组件
############################################

# 自行配置基础镜像
FROM hub.docker.xxx.com/library/openjdk:17.0.1-jdk-bullseye

LABEL maintainer="SchedulerX Team"
LABEL description="SchedulerX3 Agent - XXL-Job Executor"
LABEL version="2.4.2"

# 配置阿里云镜像源
RUN sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list && \
    sed -i 's|security.debian.org/debian-security|mirrors.aliyun.com/debian-security|g' /etc/apt/sources.list

# 安装基础工具、Python3、Node.js 和 Go
RUN apt-get update && \
    apt-get install -y python3 python3-distutils curl wget ca-certificates nodejs npm golang-go && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 使用官方脚本安装 pip
RUN curl https://bootstrap.pypa.io/get-pip.py -o /tmp/get-pip.py && \
    python3 /tmp/get-pip.py && \
    rm -f /tmp/get-pip.py && \
    ln -sf /usr/bin/python3 /usr/bin/python

# 设置 Go 环境变量
ENV GOPATH=/root/go
ENV PATH=$GOPATH/bin:$PATH
ENV GO111MODULE=on

# 复制 tar 包到镜像
COPY schedulerx3-agent-*-bin.tar.gz /tmp/schedulerx3-agent.tar.gz

# 解压 tar 包到指定目录(去掉顶层目录)
RUN mkdir -p /opt/schedulerx3-agent && \
    tar -xzf /tmp/schedulerx3-agent.tar.gz --strip-components=1 -C /opt/schedulerx3-worker && \
    chmod +x /opt/schedulerx3-agent/bin/*.sh && \
    mkdir -p /opt/schedulerx3-agent/logs && \
    rm -f /tmp/schedulerx3-agent.tar.gz

# 设置工作目录
WORKDIR /opt/schedulerx3-agent

# 暴露端口
EXPOSE 9999

# 启动命令(使用 start.sh 的前台模式)
CMD ["bin/start.sh", "-f"]

通过Kubernetes启动

  1. 创建 schedulerx3-agent.yaml 文件通过Deployment部署。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: schedulerx3-agent
      labels:
        app: schedulerx3-agent
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: schedulerx3-agent
      template:
        metadata:
          labels:
            app: schedulerx3-agent
        spec:
          containers:
            - name: schedulerx3-agent
              image: schedulerx-registry.cn-hangzhou.cr.aliyuncs.com/schedulerx3/schedulerx3-agent:1.0.0
              imagePullPolicy: Always
              ports:
                - containerPort: 9999
              env:
                - name: "SCHEDULERX3_ADMIN_ADDRESSES"
                  value: "{服务接入地址}"
                - name: "SCHEDULERX3_EXECUTOR_APPNAME"
                  value: "{应用AppName}"
                - name: "SCHEDULERX3_ACCESS_TOKEN"
                  value: "{应用AccessToken}"
              livenessProbe:
                tcpSocket:
                  port: 9999
                timeoutSeconds: 30
                initialDelaySeconds: 30
  2. 部署至Kubernetes。

    # 部署
    kubectl apply -f schedulerx3-agent.yaml

HTTP应用接入执行器

不需要SDK接入,通过配置域名或K8s服务自动发现后端节点,使用HTTP协议调度。

接入K8s服务

如果HTTP应用部署在阿里云容器服务ACK中,推荐使用“接入K8s服务”方式。

  1. (若已完成可跳过)将应用部署在阿里云容器服务ACK上,支持如下集群类型:

    • ACK托管集群,网络插件为Terway

    • ACK Serverless集群

    • ACS集群

  2. (若已完成可跳过)在ACK中为应用创建Service,支持服务类型:

    • ClusterIP

    • LoadBalancer

  3. 登录MSE XXL-JOB控制台

  4. 在页面左上角选择目标地域

  5. 单击目标实例ID,在左侧导航栏选择应用管理

  6. 单击目标应用执行器数量列下的接入按钮。

  7. 接入执行器页面,接入方式选择接入K8s服务,并完成接入K8s服务参数配置,单击确定完成接入。

接入成功后,执行期数量会发生改变,点击查看,可以看到后端Pod列表。

手动录入域名

如果应用没有部署在ACK上(比如使用ECS部署),也可以通过内网域名调度HTTP任务。

  1. (若已完成可跳过)为HTTP应用创建一个网关,会自动生成一个内网域名,比如使用NLB。

  2. 在页面左上角选择目标地域

  3. 单击目标实例ID,在左侧导航栏选择应用管理

  4. 单击目标应用执行器数量列下的接入按钮。

  5. 接入执行器页面,接入方式选择录入域名,并配置内网域名。