执行器管理
创建完应用后,需要接入执行器才能执行任务。
普通应用接入执行器
自动注册
自动注册需要业务方依赖XXLJOB的sdk,进行接入配置,应用启动成功就会自动注册。
1.查看接入配置
在页面左上角选择目标地域。
单击目标实例ID,在左侧导航栏选择应用管理。
单击目标应用执行器数量列下的接入按钮。
接入方式选择自动接入,并根据接入方式修改配置。
2.引入SDK接入执行器
Java SDK
在pom中引入“xxl-job-core”的maven依赖。
初始化执行器。
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
Go SDK
执行以下命令,使用最新的tag拉取Go版本的XXLJOB SDK。
go get github.com/xxl-job/xxl-job-executor-go@{最新的tag}
编写业务代码。
package main import ( "context" "fmt" xxl "github.com/xxl-job/xxl-job-executor-go" "github.com/xxl-job/xxl-job-executor-go/example/task" "log" ) func main() { exec := xxl.NewExecutor( xxl.ServerAddr("xxxxxx"), //请求地址,控制台应用管理接入配置获取 xxl.AccessToken("xxxxxxx"), //请求令牌,控制台应用管理接入配置获取 xxl.ExecutorPort("9999"), //默认9999(非必填) xxl.RegistryKey("golang-jobs"), //执行器名称 xxl.SetLogger(&logger{}), //自定义日志 ) exec.Init() exec.Use(customMiddleware) //设置日志查看handler exec.LogHandler(customLogHandle) //注册任务handler exec.RegTask("task.test", task.Test) exec.RegTask("task.test2", task.Test2) exec.RegTask("task.panic", task.Panic) log.Fatal(exec.Run()) } // 自定义日志处理器 func customLogHandle(req *xxl.LogReq) *xxl.LogRes { return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{ FromLineNum: req.FromLineNum, ToLineNum: 2, LogContent: "这个是自定义日志handler", IsEnd: true, }} } // xxl.Logger接口实现 type logger struct{} func (l *logger) Info(format string, a ...interface{}) { fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...)) } func (l *logger) Error(format string, a ...interface{}) { log.Println(fmt.Sprintf("自定义日志 - "+format, a...)) } // 自定义中间件 func customMiddleware(tf xxl.TaskFunc) xxl.TaskFunc { return func(cxt context.Context, param *xxl.RunReq) string { log.Println("I am a middleware start") res := tf(cxt, param) log.Println("I am a middleware end") return res } }
Python SDK
拉取依赖。
pip install pyxxl # 如果日志需要写入redis pip install "pyxxl[redis]" # 如果需要从.env加载配置 pip install "pyxxl[dotenv]" # 安装所有功能 pip install "pyxxl[all]"
编写业务代码。
import asyncio import time from pyxxl import ExecutorConfig, PyxxlRunner from pyxxl.ctx import g config = ExecutorConfig( xxl_admin_baseurl="http://xxljob-1b3fd8196eb.schedulerx.mse.aliyuncs.com/api/", executor_app_name="xueren-test", access_token="default_token", # executor_listen_host="0.0.0.0", # 如果xxl-admin可以直连executor的ip,可以不填写executor_listen_host ) app = PyxxlRunner(config) @app.register(name="demoJobHandler") async def test_task(): # you can get task params with "g" g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams) for i in range(10): g.logger.warning("test logger %s" % i) await asyncio.sleep(5) return "成功..." @app.register(name="sync_func") def test_task4(): # 如果要在xxl-admin上看到执行日志,打印日志的时候务必用g.logger来打印,默认只打印info及以上的日志 n = 1 g.logger.info("Job %s get executor params: %s" % (g.xxl_run_data.jobId, g.xxl_run_data.executorParams)) # 如果同步任务里面有循环,为了支持cancel操作,必须每次都判断g.cancel_event. while n <= 10 and not g.cancel_event.is_set(): # 如果不需要从xxl-admin中查看日志,可以用自己的logger g.logger.info( "log to {} logger test_task4.{},params:{}".format( g.xxl_run_data.jobId, n, g.xxl_run_data.executorParams, ) ) time.sleep(2) n += 1 return "成功3" if __name__ == "__main__": app.run_executor()
手动录入
手动录入可以人工维护执行器的地址信息,地址格式参考http://192.168.0.1:9999/
。
在页面左上角选择目标地域。
单击目标实例ID,在左侧导航栏选择应用管理。
单击目标应用执行器数量列下的接入按钮。
接入方式选择手动录入,并输入执行器地址。
HTTP应用接入执行器
不需要SDK接入,通过配置域名或K8s服务自动发现后端节点,使用HTTP协议调度。
接入K8s服务
如果HTTP应用部署在阿里云容器服务ACK中,推荐使用“接入K8s服务”方式。
(若已完成可跳过)将应用部署在阿里云容器服务ACK上,支持如下集群类型:
ACK托管集群,网络插件为Terway
ACK Serverless集群
ACS集群
(若已完成可跳过)在ACK中为应用创建Service,支持服务类型:
ClusterIP
LoadBalancer
在页面左上角选择目标地域。
单击目标实例ID,在左侧导航栏选择应用管理。
单击目标应用执行器数量列下的接入按钮。
在接入执行器页面,接入方式选择接入K8s服务,并完成接入K8s服务参数配置,单击确定完成接入。
接入成功后,执行期数量会发生改变,点击查看,可以看到后端Pod列表。
手动录入域名
如果应用没有部署在ACK上(比如使用ECS部署),也可以通过内网域名调度HTTP任务。
(若已完成可跳过)为HTTP应用创建一个网关,会自动生成一个内网域名,比如使用NLB。
在页面左上角选择目标地域。
单击目标实例ID,在左侧导航栏选择应用管理。
单击目标应用执行器数量列下的接入按钮。
在接入执行器页面,接入方式选择录入域名,并配置内网域名。