全部产品
云市场

网格任务示例

更新时间:2018-03-15 20:34:14

网格任务简介

网格任务的使用场景针对的是需要进行大量数据处理或较大运算量的定时任务。在任务触发后,用户可以自己定义业务逻辑,对任务进行拆分。在用户集群中,应用机器会在配置文件中添加 SchedulerX-Client 的依赖,并连接到某个特定分组。拆分后的多个子任务可以派发到所有这些应用机器中去并行执行。从而达到利用集群处理能力,快速处理大任务的目的。

网格任务支持用户自定义子任务派发逻辑、路由逻辑、处理逻辑,并且支持子任务的多级派发。

网格任务特性

  • 客户端机器自组集群:任务数据都在客户端之间流转。

  • 任务分发负载均衡:能够智能感知客户端集群的扩容缩容。

  • 故障转移:不用担心客户端宕机的问题,当接收任务的客户端宕机后,分发给它执行但还没执行完毕的任务,会动态转移到其他可用客户端机器执行。

  • 任务分发路由:能通过配置路由来动态决定任务分发到哪台机器执行,具体见路由分发

  • 支持百万级甚至千万级任务。大规模子任务场景请参考最佳实践

  • 失败补偿:比较完善的失败补偿机制。当业务层面任务执行失败时,你可以通过返回特定对象,告诉 SchedulerX 这次任务需要补偿执行。

  • 支持只分发不执行的模式:分发任务的客户端机器不会执行自己分发的任务,除非只有一台客户端机器可用。

网格任务实现

注意:

  • 使用网格任务需要 schedulerx-client-2.1.5 及以上版本。

  • dispatchTaskList 派发的子任务对象,必须实现 Java 的 Serializable 接口。

  • 序列化后的单个子任务大小不能超过64KB。

  1. 实现 Job 处理器接口。

    1. import com.alibaba.dts.client.executor.grid.processor.GridJobContext;
    2. import com.alibaba.dts.client.executor.job.processor.GridJobProcessor;
    3. import com.alibaba.dts.common.constants.Constants;
    4. import com.alibaba.dts.common.domain.result.ProcessResult;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. public class HelloGridJobProcessor implements GridJobProcessor {
    8. @Override
    9. public ProcessResult process(GridJobContext context) throws Exception {
    10. String taskName = context.getTaskName();
    11. if (Constants.DEFAULT_ROOT_LEVEL_TASK_NAME.equals(taskName)) {
    12. context.dispatchTaskList(buildDataSmall(context), "first-level-task");
    13. return new ProcessResult(true);
    14. } else if ("first-level-task".equals(taskName)) {
    15. System.out.println("business process!")
    16. return new ProcessResult(true);
    17. }
    18. return new ProcessResult(true);
    19. }
    20. private List<? extends Object> buildDataSmall(GridJobContext context) {
    21. int count = 10000;
    22. List<Integer> data = new ArrayList<Integer>();
    23. for (int i = 0; i < count; i++) {
    24. data.add(i);
    25. }
    26. return data;
    27. }
    28. }
  2. 在控制台新建 Job,任务类型选择网格 Job

  3. 在应用中依赖 SchedulerX-Client 并进行配置。

    客户端配置可参考快速入门

限流

限流的作用是为了保证任务平缓稳定的分发,防止大量子任务的积压对系统性能造成影响。

网格任务支持全局范围的限流和 Job 级别的限流。全局范围会对分组下所有 Job 起作用;Job 级别只会对特定 Job 起作用。目前支持两种 Job 级别的策略:NONE 和 H2_COUNT。

对于限流,不建议使用全局配置,推荐只配置有一级分发的 Job,并且当 Job子 任务达到100万以上时配置。设置为 H2_COUNT。

限流配置示例如下:

  1. <bean id="schedulerxClient1" class="com.alibaba.edas.schedulerx.SchedulerXClient">
  2. <property name="groupId">
  3. <value>111-1-1-11</value>
  4. </property>
  5. <property name="regionName">
  6. <value>cn-hangzhou</value>
  7. </property>
  8. <!--如果使用的是上海 Region 集群,需要设置 domainName 属性,同时指定 RegionName 为 cn-shanghai
  9. <property name="regionName">
  10. <value>cn-shanghai</value>
  11. </property>
  12. <property name="domainName">
  13. <value>schedulerx-shanghai.console.aliyun.com</value>
  14. </property>-->
  15. <property name="flowControlStrategyMap">
  16. <map>
  17. <entry key="com.alibaba.schedulerx.grid.FibonacciGridJobProcessor" <!-- key 替换为要限流的 Job 的全限定类名 -->
  18. value-ref="flowControlStrategyH2Count"></entry>
  19. </map>
  20. </property>
  21. </bean>
  22. <bean id="flowControlStrategyH2Count" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
  23. <property name="staticField"
  24. value="com.alibaba.dts.client.executor.grid.flowcontrol.FlowControlStrategy.H2_COUNT"/>
  25. </bean>

路由分发

注意:路由分发是可选配置项,不配置将默认使用 RoundRobinRule

路由分发是指可以通过代码方式计算路由条件,将任务发送到计算出来的客户端机器执行。

默认实现了两种路由策略:RandomRuleRoundRobinRule(默认的路由策略)。用户可以通过 RoundRobinRule 扩展实现自己的路由策略,只需要实现 com.alibaba.dts.client.route.RouteRule 接口,覆盖其 RemoteMachine rule(JobContext jobContext, List machines) 方法。

  1. public class RandomRule implements RouteRule {
  2. /** 随机生成数字 */
  3. private Random random = new Random();
  4. @Override
  5. public RemoteMachine rule(JobContext jobContext, List<RemoteMachine> machines) {
  6. if(null == machines || machines.isEmpty()) {
  7. return null;
  8. }
  9. //生成随机下标
  10. int index = random.nextInt() % machines.size();
  11. return machines.get(Math.abs(index));
  12. }
  13. }

Spring bean 配置

  1. <bean id="schedulerxClient" class="com.alibaba.edas.schedulerx.SchedulerXClient">
  2. <property name="groupId">
  3. <value>111-1-1-11</value>
  4. </property>
  5. <property name="regionName">
  6. <value>cn-hangzhou</value>
  7. </property>
  8. <!--如果使用的是上海 Region 集群,需要设置 domainName 属性,同时指定 RegionName 为 cn-shanghai
  9. <property name="regionName">
  10. <value>cn-shanghai</value>
  11. </property>
  12. <property name="domainName">
  13. <value>schedulerx-shanghai.console.aliyun.com</value>
  14. </property>-->
  15. <property name="routeMap">
  16. <map>
  17. <entry key="com.taobao.spring.TestGridJobProcessor" value-ref="randomRule"></entry>
  18. </map>
  19. </property>
  20. </bean>
  21. <bean id="randomRule" class="com.taobao.xxx.RandomRule"></bean>

最佳实践

  • 对于大规模任务,建议开启限流,以保证系统能够平稳运行。限流参数为 flowControlStrategy,默认是值是 FlowControlStrategy.NONE。

  • 任务分发路由不要使用复杂耗时的计算操作。

  • 只分发不执行模式下,dispatchOnly 参数设置为 true 即可,默认为 false。当分发的子任务数特别多的时候,比如100W以上,建议开启,保证分发效率。但是此设置在只有一台客户端机器的时候不起作用。

  • 大规模子任务或子任务体较大的场景下,建议先评估下一次分发的所有子任务所占内存大小,再设置合理的分发子任务数,以免内存溢出。所占内存在最大堆内存的20%以内比较合理。如果你的子任务占用内存很小,可以忽略。

高级参数配置-Client 消费线程数配置

单机执行子任务线程数量,默认配置是5。可以通过下面的配置修改线程数量,key 是 JobProcessor 实现类全类名,value 是线程数量。

  1. <bean id="dtsClientA" class="com.alibaba.edas.schedulerx.SchedulerXClient">
  2. <property name="groupId">
  3. <value>111-1-1-11</value>
  4. </property>
  5. <property name="regionName">
  6. <value>cn-hangzhou</value>
  7. </property>
  8. <!--如果使用的是上海 Region 集群,需要设置 domainName 属性,同时指定 RegionName cn-shanghai
  9. <property name="regionName">
  10. <value>cn-shanghai</value>
  11. </property>
  12. <property name="domainName">
  13. <value>schedulerx-shanghai.console.aliyun.com</value>
  14. </property>-->
  15. <property name="consumerThreadsMap">
  16. <map>
  17. <entry key="com.taobao.spring.TestParallelJobProcessor"><!--key JobProcessor 实现类的全类名-->
  18. <value>64</value> <!--value consumerThreads 的值,也就是单机线程数量-->
  19. </entry>
  20. </map>
  21. </property>
  22. </bean>