全部产品
云市场

集群任务

更新时间:2020-01-08 16:47:16

集群任务支持用户按业务的要求,通过多层的拆分将一个任务拆分到多个客户端上并发执行。

集群任务的开发可以分成两个阶段:拆分阶段和执行阶段。

  • 拆分阶段:对数据进行分片,不限制拆分层数,将拆分结果上报给服务端,由服务端根据拆分的 chunk 通知客户端来拉取数据进行处理。(chunk:一批待处理数据的索引集合)
  • 执行阶段:客户端接收到通知后拉取数据进行处理,处理完后继续拉取新的数据,直到数据都处理完成。

应用场景案例

为了便于理解,本文使用一个场景示例来介绍集群任务的开发过程。

假设某基金公司每天需要进行一次用户清算,由于用户规模较大,因此将用户分为 100 张表,每张表有 10 万左右的用户数据。该公司选择使用两层拆分的集群任务,通过集群任务并行处理能力提高数据处理效率。

具体的实现步骤如下:

  1. 任务拆分阶段:将用户数据进行拆分,详见 集群任务拆分阶段
    • 第一层拆分:按用户表维度进行数据拆分。
    • 第二层拆分:按分页维度进行数据拆分。
  2. 任务执行阶段:对每个用户数据进行处理,详见 集群任务执行阶段
    • 执行模式:支持本地执行模式以及远程执行模式,详见 执行模式
    • 线程池配置:支持使用默认线程池配置或自定义线程池配置,详见 线程池配置
  3. 控制台配置集群任务:参见 新建任务集群任务拆分

任务拆分阶段

任务调度提供了 IClusterJobSplitHandler 接口进行任务拆分,目前支持两种拆分方式:

ShardingChunkData 拆分

ShardingChunkData 拆分是指定索引的拆分方式。通过索引对这个子任务(chunk)进行唯一标识,客户端会根据指定的索引拉取数据进行处理。要求填充分片规则。

  1. public class ShardingChunkData implements IChunkData {
  2. /**
  3. * 分片号
  4. */
  5. private String shardingRule;
  6. ......
  7. }

其中,shardingRule 是一个分片规则标识,例如可以使用 user_00、user_01 … user_99 作为分片标识。

代码示例

结合上述场景案例,第一层任务拆分可以使用 shardingChunkData 对 100 张用户表进行拆分,具体代码如下:

  1. public class UserSplitterByTable implements IClusterJobSplitHandler<ShardingChunkData> {
  2. @Override
  3. public SplitChunkDataResult<ShardingChunkData> handle(ClusterJobSplitContext context) {
  4. SplitChunkDataResult<ShardingChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
  5. ArrayList<ShardingChunkData> rangeChunkDatas = new ArrayList<>();
  6. // user_00 ~ user_99
  7. for (int i = 0; i < 100; i++) {
  8. String shardingRule = "user_";
  9. if (i < 10) {
  10. shardingRule = "0";
  11. }
  12. shardingRule = shardingRule + i;
  13. // 根据表维度拆分分片
  14. ShardingChunkData shardingChunkData = new ShardingChunkData(shardingRule);
  15. rangeChunkDatas.add(shardingChunkData);
  16. }
  17. splitChunkDataResult.setChunkDatum(rangeChunkDatas);
  18. splitChunkDataResult.setSuccess(true);
  19. return splitChunkDataResult;
  20. }
  21. @Override
  22. public String getName() {
  23. // 处理器名称
  24. return "USER_SPLITTER_BY_TABLE";
  25. }
  26. @Override
  27. public ThreadPoolExecutor getThreadPool() {
  28. // 线程池,建议使用自定义线程池。如果返回null则使用 sdk 自带的线程池。
  29. return null;
  30. }
  31. }

RangeChunkData 拆分

RangeChunkData 拆分是指定范围的拆分方式。指定每个子任务处理特定范围内的数据,类似于一个分页效果。要求填充起始索引、结束索引和分片规则。

  1. public class RangeChunkData implements IChunkData {
  2. /**
  3. * 分片规则
  4. */
  5. private String shardingRule;
  6. /**
  7. * 起始索引
  8. */
  9. private String start;
  10. /**
  11. * 结束索引
  12. */
  13. private String end;
  14. ......
  15. }
  • shardingRule:分片规则,例如指定表 user_01。
  • start:开始索引,记录开始位置,例如 1000。
  • end:结束索引,记录结束位置(包含),例如 2000。

例如,以下配置表示处理 user_01 表中 1000 到 2000 的数据。

  1. RangeChunkData chunk = new RangeChunkData();
  2. chunk.setShardingRule("user_01");
  3. chunk.setStart("1000");
  4. chunk.setEnd("2000");

代码示例

在场景案例中,第二层可以使用 RangeChunkData 将用户数据按分页维度进行拆分,具体代码如下:

  1. public class UserSplitterByPage implements IClusterJobSplitHandler<RangeChunkData> {
  2. @Override
  3. public SplitChunkDataResult<RangeChunkData> handle(ClusterJobSplitContext context) {
  4. // 第一层拆分的分片
  5. ShardingChunkData shardingChunkData = (ShardingChunkData) context.getChunkData();
  6. String shardingRule = shardingChunkData.getShardingRule();
  7. // 1. 根据分片查询数量
  8. int count = queryCountByTable(shardingChunkData.getShardingRule());
  9. SplitChunkDataResult<RangeChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
  10. ArrayList<RangeChunkData> rangeChunkDatas = new ArrayList<>();
  11. // 2. 做分页处理,每页处理 1000 条
  12. int pageCount = 1000;
  13. for (int page = 0; page < count / pageCount; page++) {
  14. String startRows = String.valueOf(page * pageCount);
  15. // 包含
  16. String endRows = String.valueOf((page + 1) * pageCount - 1);
  17. RangeChunkData rangeChunkData = new RangeChunkData(shardingRule, startRows, endRows);
  18. rangeChunkDatas.add(rangeChunkData);
  19. }
  20. splitChunkDataResult.setChunkDatum(rangeChunkDatas);
  21. splitChunkDataResult.setSuccess(true);
  22. return splitChunkDataResult;
  23. }
  24. // mock
  25. private int queryCountByTable(String shardingRule) {
  26. if ("user_00".equals(shardingRule)) {
  27. // user_00 表 10 万用户
  28. return 100000;
  29. } else {
  30. // 其他表 9 万用户
  31. return 90000;
  32. }
  33. }
  34. @Override
  35. public String getName() {
  36. return "USER_SPLITTER_BY_PAGE";
  37. }
  38. @Override
  39. public ThreadPoolExecutor getThreadPool() {
  40. return null;
  41. }
  42. }

任务执行阶段

集群任务执行阶段分为三个子阶段:

  • Read 阶段:从数据源读取数据。
  • Process 阶段:将读取的数据对象转换为要写入数据源的对象,该阶段不是必须阶段,可以不设置。
  • Write 阶段:将数据写入数据源。
说明:集群任务拆分阶段的相关代码,可参考 任务调度工程示例 中的 ClusterExecuteHandler

Read 阶段

读取数据服务需要实现接口 IReader

  1. public interface IReader<T> {
  2. /**
  3. * 读取数据
  4. *
  5. * @param context
  6. * @return
  7. */
  8. LoadData<T> read(ClusterJobExecuteContext context) throws Exception;
  9. }

LoadData 对象数据结构如下:

  1. public class LoadData<T> extends MultiDataItem<T> {
  2. /**
  3. * 是否还有未捞取的待处理数据,值为false时,处理完本次捞取出的数据就回调服务端;当值为true时,认为还有数据待处理,处理完本次捞取数据后继续捞取数据。
  4. */
  5. private boolean hasMore;
  6. public LoadData(List<T> itemList, boolean hasMore) {
  7. super(itemList);
  8. this.hasMore = hasMore;
  9. }
  10. public static boolean isEmpty(LoadData loadData) {
  11. return loadData == null || loadData.isEmpty();
  12. }
  13. public boolean isHasMore() {
  14. return hasMore;
  15. }
  16. public void setHasMore(boolean hasMore) {
  17. this.hasMore = hasMore;
  18. }
  19. }

使用 read 方法分批读取数据时,需要注意设置 hasMore 的值,当值为 true 时,处理完本次读取的数据后会再次进行读取数据。

如果需要打开关闭文件,可以使用 IStreamReader 接口:

  1. public interface IStream {
  2. /**
  3. * 打开流
  4. *
  5. * @param context
  6. * @throws Exception
  7. */
  8. void open(ClusterJobExecuteContext context) throws Exception;
  9. /**
  10. * 关闭流
  11. *
  12. * @param context
  13. * @throws Exception
  14. */
  15. void close(ClusterJobExecuteContext context) throws Exception;
  16. }
  17. public interface IStreamReader<T> extends IStream, IReader<T> {
  18. }

该接口继承自 IStream 接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。

Process 阶段

处理数据服务需要实现接口 IProcessor

  1. public interface IProcessor<I, O> {
  2. /**
  3. * 数据处理,将读取出的对象加工转换为要处理的对象
  4. *
  5. * @param r
  6. * @return
  7. */
  8. DataProcessResult<O> process(ClusterJobExecuteContext context, I i) throws Exception;
  9. /**
  10. * 数据处理线程池,没有配置时使用handler的线程池
  11. *
  12. * @return
  13. *
  14. */
  15. ThreadPoolExecutor getProcessThreadPool();
  16. }

当不需要对读取的数据进行转换时,可以不设置该服务,这样会直接把读取的数据交给写服务。该服务可以单独指定线程池,当没有设置时默认使用任务 handler 的线程池。

Write 阶段

写数据服务需要实现接口 IWriter

  1. public interface IWriter<T> {
  2. /**
  3. * 每次执行的数据块包含的数据量最大值
  4. *
  5. * @return
  6. */
  7. int getCountPerWrite();
  8. /**
  9. * 写数据
  10. *
  11. * @param context
  12. * @param dataItem
  13. * @return
  14. * @throws Exception
  15. */
  16. ClientCommonResult write(ClusterJobExecuteContext context,
  17. IDataItem<T> dataItem) throws Exception;
  18. /**
  19. * 写数据线程池,没有配置时使用handler的线程池
  20. *
  21. * @return
  22. */
  23. ThreadPoolExecutor getWriteThreadPool();
  24. }

getCountPerWrite 方法用来设置每次批量 write 的数据量,值小于等于 0 时认为一次只写入一条数据。该服务可以单独设置线程池,当没有设置时默认使用任务 handler 的线程池。

IDataItem 就是待处理的数据对象,有两种类型:

  • SingleDataItem:只包含一条数据,当 getCountPerWrite 方法的返回值小于或等于 1 时是该类型。
  • MultiDataItem:包含多条数据,当 getCountPerWrite 方法的返回值大于 1 时是该类型。

可根据实际情况对 dataItem 进行类型转换,例如:

  1. switch (dataItem.getType()) {
  2. case SINGLE:
  3. ((SingleDataItem<Integer>) dataItem).getItem();
  4. break;
  5. case MULTIPLE:
  6. (MultiDataItem<Integer>) dataItem).getItemList();
  7. break;
  8. default:
  9. break;
  10. }

如果需要打开关闭文件,可以使用接口 IStreamWriter

  1. public interface IStreamWriter<T> extends IStream, IWriter<T> {
  2. }

该接口继承了 IStream 接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。

代码示例

结合上述应用场景案例,在执行阶段,需要根据拆分的分片从数据库或其他数据源中拉取数据进行处理,然后落库。

  • Read 阶段:根据分片信息(shardingRule、start、end)到指定表里拉取指定范围的用户数据。
  • Process 阶段:对拉取到的用户数据进行处理。
  • Write 阶段:将处理后的数据落库。

具体代码如下:

  1. public class UserProcessHandler implements IClusterJobExecuteHandler {
  2. private final Logger LOGGER = LoggerFactory.getLogger(ClusterJobExecutor.class);
  3. private ThreadPoolExecutor threadPool;
  4. @Override
  5. public void preExecute(ClusterJobExecuteContext context) {
  6. // 前置处理
  7. LOGGER.info(String.format("preExecute chunkId:%s, param:%s",
  8. context.getChunkId(), context.getCustomParams()));
  9. }
  10. @Override
  11. public IReader getReader() {
  12. return new IReader() {
  13. @Override
  14. public LoadData<String> read(ClusterJobExecuteContext context) throws Exception {
  15. IChunkData chunkData = context.getChunk();
  16. RangeChunkData rangeChunkData = (RangeChunkData) chunkData;
  17. // 根据索引去查询数据
  18. String shardingRule = rangeChunkData.getShardingRule();
  19. int start = Integer.parseInt(rangeChunkData.getStart());
  20. int end = Integer.parseInt(rangeChunkData.getEnd());
  21. // 模拟查询数据
  22. List<String> stringList = queryUserInfo(shardingRule, start, end);
  23. // 一次查询,后面没有数据
  24. boolean hasMore = false;
  25. return new LoadData<String>(stringList, hasMore);
  26. }
  27. private List<String> queryUserInfo(String shardingRule, int start, int end) {
  28. List<String> stringList = Lists.newArrayList();
  29. for (int userId = start; userId <= end; userId++) {
  30. stringList.add(shardingRule + "" + userId);
  31. }
  32. return stringList;
  33. }
  34. };
  35. }
  36. @Override
  37. public IProcessor getProcessor() {
  38. return new IProcessor() {
  39. @Override
  40. public DataProcessResult process(ClusterJobExecuteContext context,
  41. Object data) throws Exception {
  42. // 处理数据
  43. System.out.println("process user" + data);
  44. return new DataProcessResult(true, "", data);
  45. }
  46. @Override
  47. public ThreadPoolExecutor getProcessThreadPool() {
  48. return null;
  49. }
  50. };
  51. }
  52. @Override
  53. public IWriter getWriter() {
  54. return new IWriter<String>() {
  55. @Override
  56. public int getCountPerWrite() {
  57. return 1;
  58. }
  59. @Override
  60. public ClientCommonResult write(ClusterJobExecuteContext context,
  61. IDataItem<String> dataItem) throws Exception {
  62. // 数据存储
  63. switch (dataItem.getType()) {
  64. case SINGLE:
  65. // 单个数据块,只包含一条数据
  66. SingleDataItem<String> singleDataItem = (SingleDataItem<String>) dataItem;
  67. LOGGER.info(String.format("getWriter write single data:%s", singleDataItem.getItem()));
  68. System.out.println(String.format("write single data:%s", singleDataItem.getItem()));
  69. break;
  70. case MULTIPLE:
  71. // 复合数据块,包含多条数据。比如任务停止时会将多条数据传入
  72. MultiDataItem<String> multiDataItem = (MultiDataItem<String>) dataItem;
  73. LOGGER.info(String.format("getWriter write multi data:%s", multiDataItem.getItemList()));
  74. break;
  75. default:
  76. break;
  77. }
  78. return ClientCommonResult.buildSuccessResult();
  79. }
  80. @Override
  81. public ThreadPoolExecutor getWriteThreadPool() {
  82. return BlockingThreadPool.getThreadPool();
  83. }
  84. };
  85. }
  86. @Override
  87. public ILimiter getLimiter() {
  88. return new DefaultLimiter(10);
  89. }
  90. @Override
  91. public void postExecute(ClusterJobExecuteContext context) {
  92. // 后置处理
  93. LOGGER.info(String.format("JobExecuteHandler postExecute chunkId:%s, param:%s",
  94. context.getChunkId(), context.getCustomParams()));
  95. }
  96. @Override
  97. public Progress calProgress(ClusterJobExecuteContext context) {
  98. return new Progress();
  99. }
  100. @Override
  101. public boolean isProcessAsync() {
  102. return true;
  103. }
  104. @Override
  105. public String getName() {
  106. return "USER_PROCESS_HANDLER";
  107. }
  108. @Override
  109. public ThreadPoolExecutor getThreadPool() {
  110. return this.threadPool;
  111. }
  112. public void setThreadPool(ThreadPoolExecutor threadPool) {
  113. this.threadPool = CommonThreadPool.getThreadPool("REMOTE_EXECUTE");
  114. }
  115. }

执行模式

集群任务的执行阶段支持两种模式:

本机执行

本机执行时需要实现接口 IClusterJobExecuteHandler,如下:

  1. public interface IJobHandler {
  2. /**
  3. * handler的名字
  4. *
  5. * @return
  6. */
  7. String getName();
  8. /**
  9. * 可以留空, 使用默认执行线程池
  10. *
  11. * @return
  12. */
  13. ThreadPoolExecutor getThreadPool();
  14. }
  15. public interface IClusterJobExecuteHandler<I, O> extends IJobHandler {
  16. /**
  17. * 预处理
  18. *
  19. * @param context
  20. */
  21. void preExecute(ClusterJobExecuteContext context);
  22. /**
  23. * 获取数据读取服务
  24. *
  25. * @return
  26. */
  27. IReader<I> getReader();
  28. /**
  29. * 获取数据清洗服务,返回NULL时会跳过数据清洗步骤,直接处理读取出的数据
  30. *
  31. * @return
  32. */
  33. IProcessor<I, O> getProcessor();
  34. /**
  35. *
  36. * @return
  37. */
  38. IWriter<O> getWriter();
  39. /**
  40. * 获取限流器,返回NULL时使用默认限流器
  41. *
  42. * @return
  43. */
  44. ILimiter getLimiter();
  45. /**
  46. * 本次执行完的后置处理
  47. *
  48. * @param context
  49. * @return
  50. */
  51. void postExecute(ClusterJobExecuteContext context);
  52. /**
  53. * 计算处理进度
  54. *
  55. * @param context
  56. * @return
  57. */
  58. Progress calProgress(ClusterJobExecuteContext context);
  59. /**
  60. * 是否异步处理,开启异步执行后,reader-process-write阶段全部是异步处理
  61. *
  62. * @return
  63. */
  64. boolean isProcessAsync();
  65. }

实现 IClusterJobExecuteHandler 的类负责处理任务。实现时需要设置:

  • IReader
  • IWriter
  • IProcessor:非必须设置,当没有设置 IProcessor 时会把读取出的数据直接交给 IWriter 服务。
  • ILimiter:没有设置时使用默认的限流服务。

当 isProcessAsync 返回 true 时,reader-process-write 阶段全部是异步处理,即读取的数据放到队列给 process 或 write 消费后会立即开始下一次读取。当返回 false 时,读取的数据需要等待被 write 消费完后开始下一次读取。

注意:IReader 服务如果配置成了 bean,需要确保服务是无状态的,否则多线程场景下会相互干扰。如果需要存储一些处理过程中的数据,那么在需要在 getReader 方法里 new 一个 IReader 服务实例。IWriter 服务也是如此。

远程执行

远程执行时需要实现接口 IRemoteClusterJobExecuteHandler,如下:

  1. public interface IRemoteClusterJobExecuteHandler<T> extends IClusterJobExecuteHandler<T> {
  2. /**
  3. * 设置路由策略,没有设置时默认为轮询
  4. *
  5. * @return
  6. */
  7. IClusterRouter getClusterRouter();
  8. /**
  9. * 分发完一次load出来的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位是ms,返回值<=0时认为不休眠
  10. *
  11. * @return
  12. */
  13. int getSleepAfterPerLoad();
  14. }

该接口继承自 IClusterJobExecuteHandler,需要额外两个方法:getClusterRouter 和 getSleepAfterPerLoad。

  • 当远程处理数据时,客户端会把捞取的数据分发给业务集群内的其它机器进行处理,getClusterRouter 方法用来设置分发时的路由规则,目前支持随机(RandomClusterRouter)和轮询(RoundRobinClusterRouter)两种方式,当方法返回 null 时默认为轮询。
  • getSleepAfterPerLoad 方法用来设置处理完读取的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位时ms,仅当 isProcessAsync 返回 false 时生效。

集群内的远程 RPC 调用使用 oneway 模式,所以无法获取数据的处理结果,需要业务方自己记录处理结果。如需帮助,请 提交工单 咨询。

可以通过实现接口 IRemoteProcessorExecutor 来设置接收远程分发请求的线程池,将实现类发布成 bean 即可,接口如下:

  1. public interface IRemoteProcessorExecutor {
  2. /**
  3. * 获取线程池
  4. *
  5. * @return
  6. */
  7. Executor getExecutor();
  8. }

另外,远程调用需要在客户端启动一个 RpcServer:

  • 对于 SOFABoot 项目,在工程的 application.properties 文件中使用配置项:
    1. com.alipay.sofa.antscheduler.remote.execute.enable=true
    2. com.alipay.sofa.antscheduler.remote.execute.port=xxx
  • 对于非 SOFABoot 项目,需要在初始化客户端的时候设置下面两个参数:

    1. public class Config {
    2. ......
    3. /**
    4. * 是否支持远程执行
    5. */
    6. private boolean isEnableRemoteExecute = false;
    7. /**
    8. * 远程服务端口,没有设置时使用默认端口9989
    9. */
    10. private int remoteServerPort;
    11. ......

线程池默认配置

IProcessor 和 IWriter 的线程池没有设置时,默认使用 IJobHandler 里设置的线程池。当 IJobHandler 也没有设置时,使用的是客户端默认指定的,配置参数为:

  1. minPoolSize20
  2. maxPoolSize300
  3. queueSize100
  4. keepAliveTime1小时

当没有设置 IRemoteProcessExecutor 时,默认使用 bolt 协议提供的默认线程池,配置参数为:

  1. minPoolSize20
  2. maxPoolSize400
  3. queueSize600
  4. keepAliveTime60

您也可以对 IProcessor,IWriter,IJobHandler 以及 IRemoteProcessExecutor 自定义设置线程池。