全部产品
云市场

如何在容器服务 K8s 集群中使用任务调度

更新时间:2018-11-28 20:42:04

本文档介绍如何在容器服务 Kubernetes 集群中使用分布式任务调度 SchedulerX。

在容器服务 Kubernetes 集群中使用分布式任务调度的大概流程如下:

  1. 在本地开发调度任务(适用于简单任务和并行任务,脚本任务不需要本地开发)。

  2. 部署到 EDAS 的容器服务 Kubernetes 集群

  3. 创建调度任务

  4. 执行调度任务

当前 SchedulerX 支持四种任务类型:

任务类型 说明
脚本任务 不需要执行器,可以直接在前端配置运行 Go/Shell/Python 脚本,需要确保脚本在执行机器中存在。
简单任务 需要实现 ScxSimpleJobProcessor 接口的执行器,有且仅有1台客户端执行。
简单任务多机版 需要实现 ScxSimpleJobProcessor 接口的执行器,所有客户端广播执行。
网格任务 需要实现 GridJobProcessor 接口的执行器,有且只有 1 台客户端执行根任务,进行 1 级子任务分发。所有客户端并行执行子任务及多级子任务分发。

前提条件

本地开发简单任务和并行任务

说明:脚本任务比较简单,无需本地开发,直接编写 Dockerfile 制作镜像并部署即可。

开发任务执行器

创建实现任务的类。

  • 简单任务实例

    1. public class TestSimpleProcessor implements ScxSimpleJobProcessor{
    2. @Override
    3. public ProcessResult process(ScxSimpleJobContext context) {
    4. // 每次执行的时候打印“simpleJob_<执行时刻>”
    5. String fileName = Constants.SIMPLE_JOB_PREFIX + new Date().toString();
    6. System.out.println(fileName);
    7. try {
    8. // 每次执行的时候创建“/tmp/simpleJob_<执行时刻>”文件
    9. Files.createFile(Paths.get(Constants.DIR, fileName));
    10. } catch (IOException e) {
    11. System.out.println(e.toString());
    12. return new ProcessResult(false);
    13. }
    14. return new ProcessResult(true);
    15. }
    16. }
    • 网格任务实例

      1. public class TestGridProcessor implements GridJobProcessor, Serializable {
      2. private static final long serialVersionUID = -33431929501333765L;
      3. @Override
      4. public ProcessResult process(GridJobContext gridJobContext) throws Exception {
      5. //获取要处理的任务
      6. Object task = gridJobContext.getTask();
      7. //获取任务名称
      8. String taskName = gridJobContext.getTaskName();
      9. if(com.alibaba.dts.common.constants.Constants.DEFAULT_ROOT_LEVEL_TASK_NAME.equals(taskName)) {
      10. //任务名称为DEFAULT_ROOT_LEVEL_TASK_NAME, 表明该任务是根子任务
      11. System.out.println("parallel root task starting...");
      12. List<String> subTasks = new ArrayList<>(4);
      13. subTasks.add("1");
      14. subTasks.add("2");
      15. subTasks.add("3");
      16. //把子任务分发出去并行处理
      17. Result<Boolean> result = gridJobContext.dispatchTaskList(subTasks, "子任务");
      18. System.out.println("parallel root task end...");
      19. //true表示执行成功,false表示失败
      20. return new ProcessResult(true);
      21. } else if("子任务".equals(taskName)) {
      22. //任务名称为子任务,表明该任务是根子任务分发的子任务
      23. String subTask = (String)task;
      24. String fileName = Constants.GRID_JOB_PREFIX + "subTask_" + subTask + new Date().toString();
      25. System.out.println(fileName);
      26. try {
      27. Files.createFile(Paths.get(Constants.DIR, fileName));
      28. } catch (IOException e) {
      29. System.out.println(e.toString());
      30. return new ProcessResult(false);
      31. }
      32. //true表示执行成功,false表示失败
      33. return new ProcessResult(true);
      34. }
      35. //默认返回true, 最好不要返回null, 返回null就会判定为任务执行失败
      36. return new ProcessResult(true);
      37. }
      38. }

开发任务启动器

创建一个类,启动任务执行器。

  1. public class ClientMain {
  2. public static void main(String[] args) {
  3. SchedulerXClient schedulerXClient = new SchedulerXClient();
  4. /*
  5. * get basic environment variables
  6. */
  7. //获取该客户端的groupId
  8. String groupId = System.getenv("SCX_GROUP_ID");
  9. //获取该客户端运行的region,EDAS概念
  10. String regionName = System.getenv("SCX_REGION");
  11. //获取该客户端要访问的Console damain (注:SchedulerX使用Console做server的服务发现)
  12. String domainName = System.getenv("SCX_DOMAIN");
  13. if (groupId == null || groupId.trim().isEmpty()) {
  14. throw new IllegalArgumentException("please make sure 'SCX_GROUP_ID' env variable not empty.");
  15. }
  16. if (regionName == null || regionName.trim().isEmpty()) {
  17. throw new IllegalArgumentException("please make sure 'SCX_REGION' env variable not empty.");
  18. }
  19. schedulerXClient.setGroupId(groupId);
  20. schedulerXClient.setRegionName(regionName);
  21. schedulerXClient.setNewVersion(true);
  22. if (domainName != null && !domainName.trim().isEmpty()) {
  23. schedulerXClient.setDomainName(domainName);
  24. }
  25. //正式启动客户端
  26. try {
  27. schedulerXClient.init();
  28. } catch (Throwable e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

编译本地程序

  1. 执行 mvn install 编译本地的程序代码及相关依赖。

  2. 进入本地编译输出的路径,执行 tar cf schedulerx-client-script.tar target/将本地程序打包为 schedulerx-client-script.tar

您也可以将 schedulerx-client-script.tar 上传到 OSS,以便后续使用。

部署到 EDAS

制作镜像并上传到镜像仓库

制作镜像的步骤请参考制作基于容器服务的 Kubernetes 应用镜像,不再赘述。本节主要介绍如何编写任务调度镜像的 Dockerfile。

  • 简单任务和并行任务 Dockerfile

    1. FROM centos:7
    2. # 安装打包必备软件
    3. RUN yum -y install wget
    4. # 准备运行时环境变量
    5. ENV JAVA_HOME /usr/java/latest
    6. ENV PATH $PATH:$JAVA_HOME/bin
    7. # 准备SchedulerX环境变量
    8. ENV SCX_HOME /home/admin/schedulerx
    9. ENV SCX_GROUP_ID 101-1-2-5289 //此前创建的任务分组
    10. ENV SCX_REGION cn-hangzhou //任务集群要部署的地域
    11. # 设置 EDAS-Container 版本
    12. ENV EDAS_CONTAINER_VERSION V3.5.1
    13. LABEL pandora V3.5.0
    14. # 下载安装 JDK 8
    15. RUN wget http://edas-hz.oss-cn-hangzhou.aliyuncs.com/agent/prod/files/jdk-8u191-linux-x64.rpm -O /tmp/jdk-8u191-linux-x64.rpm && \
    16. yum -y install /tmp/jdk-8u191-linux-x64.rpm && \
    17. rm -rf /tmp/jdk-8u191-linux-x64.rpm
    18. # 下载安装schedulerx-client运行包
    19. RUN mkdir -p ${SCX_HOME}
    20. RUN wget http://schedulerx.oss-cn-hangzhou.aliyuncs.com/Yanxun/schedulerx-client-script.tar -O ${SCX_HOME}/schedulerx-client-edas.tar && \
    21. tar -xvf ${SCX_HOME}/schedulerx-client-edas.tar -C ${SCX_HOME} && \
    22. mv ${SCX_HOME}/target/* ${SCX_HOME}/ && \
    23. rm -rf ${SCX_HOME}/schedulerx-client-edas.tar ${SCX_HOME}/target
    24. # 生成执行命令
    25. RUN echo 'java -cp lib/*:schedulerx-client-edas-1.0-SNAPSHOT.jar ClientMain' > ${SCX_HOME}/start.sh && chmod +x ${SCX_HOME}/start.sh
    26. RUN echo 'echo 'hello world' && touch /tmp/helloword.txt' > ${SCX_HOME}/hello.sh && chmod +x ${SCX_HOME}/hello.sh
    27. # 设定运行目录
    28. WORKDIR ${SCX_HOME}
    29. # 运行命令
    30. CMD ["/bin/bash", "/home/admin/schedulerx/start.sh"]
  • 脚本任务 Dockerfile

    1. FROM centos:7
    2. # 安装打包必备软件
    3. RUN yum -y install wget
    4. # 准备运行时环境变量
    5. ENV JAVA_HOME /usr/java/latest
    6. ENV PATH $PATH:$JAVA_HOME/bin
    7. # 准备SchedulerX环境变量
    8. ENV SCX_HOME /home/admin/schedulerx
    9. # 设置 EDAS-Container 版本
    10. ENV EDAS_CONTAINER_VERSION V3.5.1
    11. LABEL pandora V3.5.0
    12. # 下载安装 JDK 8
    13. RUN wget http://edas-hz.oss-cn-hangzhou.aliyuncs.com/agent/prod/files/jdk-8u191-linux-x64.rpm -O /tmp/jdk-8u191-linux-x64.rpm && \
    14. yum -y install /tmp/jdk-8u191-linux-x64.rpm && \
    15. rm -rf /tmp/jdk-8u191-linux-x64.rpm
    16. # 下载安装脚本任务运行Agent
    17. RUN mkdir -p ${SCX_HOME}
    18. RUN wget http://edas.oss-cn-hangzhou.aliyuncs.com/SchedulerX/schedulerx-client-2.1.4.tar.gz -O ${SCX_HOME}/schedulerx-client-script.tar && \
    19. tar -xvf ${SCX_HOME}/schedulerx-client-script.tar -C ${SCX_HOME} && \
    20. mv ${SCX_HOME}/SchedulerX-Agent/* ${SCX_HOME}/ && \
    21. rm -rf ${SCX_HOME}/schedulerx-client-script.tar ${SCX_HOME}/SchedulerX-Agent
    22. # 配置Agent
    23. RUN rm -rf ${SCX_HOME}/conf/agent.ini && \
    24. touch ${SCX_HOME}/conf/agent.ini && \
    25. echo '[baseSection]' >> ${SCX_HOME}/conf/agent.ini && \
    26. echo 'groupId=101-1-2-5291' >> ${SCX_HOME}/conf/agent.ini && \ //此前创建的任务分组
    27. echo 'regionName=cn-hangzhou' >> ${SCX_HOME}/conf/agent.ini && \ //任务镜像要部署到的地域
    28. echo 'newVersion=true' >> ${SCX_HOME}/conf/agent.ini
    29. # 生成执行命令
    30. RUN echo 'echo 'hello world'' > ${SCX_HOME}/hello.sh && chmod +x ${SCX_HOME}/hello.sh
    31. # 设定运行目录
    32. WORKDIR ${SCX_HOME}
    33. # 运行命令
    34. CMD ["/bin/bash", "/home/admin/schedulerx/bin/start-1g.sh]

将镜像部署到 EDAS 的容器服务 Kubernetes 集群中

参考发布容器服务 Kubernetes 版集群应用,将本地的任务镜像部署到 EDAS 的容器服务 Kubernetes 集群中。

验证镜像部署结果

  1. 登录 EDAS 控制台,在左侧单击应用管理,在应用管理页面单击部署的应用名称。

  2. 在应用详情页面单击实例部署信息,查看部署结果及 Pod 状态,确认部署结果。

创建调度任务

  1. 参考创建任务,创建您需要的调度任务。

  2. 确认调度任务的连接机器。

    1. 在分布式任务管理的任务列表页面中,在您创建的任务的操作列单击单击更多,在弹出的菜单中单击连接机器

    2. 连接机器对话框中,查看该任务连接的机器的 IP 地址是否与您部署 Pod 一致。

执行调度任务

  1. 任务列表页面中您刚才创建的任务的操作列单击触发一次或根据规则自动触发。

  2. 登录您部署镜像的 Pod,查看任务是否被正常执行。