集群任务支持用户按业务的要求,通过多层的拆分将一个任务拆分到多个客户端上并发执行。
集群任务的开发可以分成两个阶段:拆分阶段和执行阶段。
拆分阶段:对数据进行分片,不限制拆分层数,将拆分结果上报给服务端,由服务端根据拆分的 chunk 通知客户端来拉取数据进行处理。(chunk:一批待处理数据的索引集合)
执行阶段:客户端接收到通知后拉取数据进行处理,处理完后继续拉取新的数据,直到数据都处理完成。
应用场景案例
为了便于理解,本文使用一个场景示例来介绍集群任务的开发过程。
假设某基金公司每天需要进行一次用户清算,由于用户规模较大,因此将用户分为 100 张表,每张表有 10 万左右的用户数据。该公司选择使用两层拆分的集群任务,通过集群任务并行处理能力提高数据处理效率。
具体的实现步骤如下:
任务拆分阶段
任务调度提供了 IClusterJobSplitHandler
接口进行任务拆分,目前支持两种拆分方式:
ShardingChunkData 拆分:指定索引的拆分方式。具体使用代码,可参考 工程示例 中的
ClusterFstSplitHandler
。RangeChunkData 拆分:指定范围的拆分方式。具体使用代码,可参考 工程示例 中的
ClusterSecSplitHandler
。
ShardingChunkData 拆分
ShardingChunkData 拆分是指定索引的拆分方式。通过索引对这个子任务(chunk)进行唯一标识,客户端会根据指定的索引拉取数据进行处理。要求填充分片规则。
public class ShardingChunkData implements IChunkData {
/**
* 分片号
*/
private String shardingRule;
......
}
其中,shardingRule
是一个分片规则标识,例如可以使用 user_00、user_01 … user_99 作为分片标识。
代码示例
结合上述场景案例,第一层任务拆分可以使用 shardingChunkData 对 100 张用户表进行拆分,具体代码如下:
public class UserSplitterByTable implements IClusterJobSplitHandler<ShardingChunkData> {
@Override
public SplitChunkDataResult<ShardingChunkData> handle(ClusterJobSplitContext context) {
SplitChunkDataResult<ShardingChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
ArrayList<ShardingChunkData> rangeChunkDatas = new ArrayList<>();
// user_00 ~ user_99
for (int i = 0; i < 100; i++) {
String shardingRule = "user_";
if (i < 10) {
shardingRule = "0";
}
shardingRule = shardingRule + i;
// 根据表维度拆分分片
ShardingChunkData shardingChunkData = new ShardingChunkData(shardingRule);
rangeChunkDatas.add(shardingChunkData);
}
splitChunkDataResult.setChunkDatum(rangeChunkDatas);
splitChunkDataResult.setSuccess(true);
return splitChunkDataResult;
}
@Override
public String getName() {
// 处理器名称
return "USER_SPLITTER_BY_TABLE";
}
@Override
public ThreadPoolExecutor getThreadPool() {
// 线程池,建议使用自定义线程池。如果返回null则使用 sdk 自带的线程池。
return null;
}
}
RangeChunkData 拆分
RangeChunkData 拆分是指定范围的拆分方式。指定每个子任务处理特定范围内的数据,类似于一个分页效果。要求填充起始索引、结束索引和分片规则。
public class RangeChunkData implements IChunkData {
/**
* 分片规则
*/
private String shardingRule;
/**
* 起始索引
*/
private String start;
/**
* 结束索引
*/
private String end;
......
}
shardingRule
:分片规则,例如指定表 user_01。start
:开始索引,记录开始位置,例如 1000。end
:结束索引,记录结束位置(包含),例如 2000。
例如,以下配置表示处理 user_01 表中 1000 到 2000 的数据。
RangeChunkData chunk =new RangeChunkData ();
chunk.setShardingRule("user_01");
chunk.setStart("1000");
chunk.setEnd("2000");
代码示例
在场景案例中,第二层可以使用 RangeChunkData 将用户数据按分页维度进行拆分,具体代码如下:
public class UserSplitterByPage implements IClusterJobSplitHandler<RangeChunkData> {
@Override
public SplitChunkDataResult<RangeChunkData> handle(ClusterJobSplitContext context) {
// 第一层拆分的分片
ShardingChunkData shardingChunkData = (ShardingChunkData) context.getChunkData();
String shardingRule = shardingChunkData.getShardingRule();
// 1. 根据分片查询数量
int count = queryCountByTable(shardingChunkData.getShardingRule());
SplitChunkDataResult<RangeChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
ArrayList<RangeChunkData> rangeChunkDatas = new ArrayList<>();
// 2. 做分页处理,每页处理 1000 条
int pageCount = 1000;
for (int page = 0; page < count / pageCount; page++) {
String startRows = String.valueOf(page * pageCount);
// 包含
String endRows = String.valueOf((page + 1) * pageCount - 1);
RangeChunkData rangeChunkData = new RangeChunkData(shardingRule, startRows, endRows);
rangeChunkDatas.add(rangeChunkData);
}
splitChunkDataResult.setChunkDatum(rangeChunkDatas);
splitChunkDataResult.setSuccess(true);
return splitChunkDataResult;
}
// mock
private int queryCountByTable(String shardingRule) {
if ("user_00".equals(shardingRule)) {
// user_00 表 10 万用户
return 100000;
} else {
// 其他表 9 万用户
return 90000;
}
}
@Override
public String getName() {
return "USER_SPLITTER_BY_PAGE";
}
@Override
public ThreadPoolExecutor getThreadPool() {
return null;
}
}
任务执行阶段
集群任务执行阶段分为三个子阶段:
Read 阶段:从数据源读取数据。
Process 阶段:将读取的数据对象转换为要写入数据源的对象,该阶段不是必须阶段,可以不设置。
Write 阶段:将数据写入数据源。
说明集群任务拆分阶段的相关代码,可参考 任务调度工程示例 中的
ClusterExecuteHandler
。
Read 阶段
读取数据服务需要实现接口 IReader
:
public interface IReader<T>{
/**
* 读取数据
*
* @param context
* @return
*/
LoadData<T> read(ClusterJobExecuteContext context) throws Exception;
}
LoadData 对象数据结构如下:
public class LoadData<T> extends MultiDataItem<T>{
/**
* 是否还有未捞取的待处理数据,值为false时,处理完本次捞取出的数据就回调服务端;当值为true时,认为还有数据待处理,处理完本次捞取数据后继续捞取数据。
*/
private boolean hasMore;
public LoadData(List<T> itemList,boolean hasMore){
super(itemList);
this.hasMore=hasMore;
}
public static boolean isEmpty(LoadData loadData){
return loadData==null || loadData.isEmpty();
}
public boolean isHasMore(){
return hasMore;
}
public void setHasMore(boolean hasMore){
this.hasMore=hasMore;
}
}
使用 read 方法分批读取数据时,需要注意设置 hasMore
的值,当值为 true 时,处理完本次读取的数据后会再次进行读取数据。
如果需要打开关闭文件,可以使用 IStreamReader
接口:
public interface IStream {
/**
* 打开流
*
* @param context
* @throws Exception
*/
void open(ClusterJobExecuteContext context) throws Exception;
/**
* 关闭流
*
* @param context
* @throws Exception
*/
void close(ClusterJobExecuteContext context) throws Exception;
}
public interface IStreamReader<T> extends IStream, IReader<T> {
}
该接口继承自 IStream
接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。
Process 阶段
处理数据服务需要实现接口 IProcessor
:
public interface IProcessor<I, O> {
/**
* 数据处理,将读取出的对象加工转换为要处理的对象
*
* @param r
* @return
*/
DataProcessResult<O> process(ClusterJobExecuteContext context, I i) throws Exception;
/**
* 数据处理线程池,没有配置时使用handler的线程池
*
* @return
*/
ThreadPoolExecutor getProcessThreadPool();
}
当不需要对读取的数据进行转换时,可以不设置该服务,这样会直接把读取的数据交给写服务。该服务可以单独指定线程池,当没有设置时默认使用任务 handler 的线程池。
Write 阶段
写数据服务需要实现接口 IWriter
:
public interface IWriter<T> {
/**
* 每次执行的数据块包含的数据量最大值
*
* @return
*/
int getCountPerWrite();
/**
* 写数据
*
* @param context
* @param dataItem
* @return
* @throws Exception
*/
ClientCommonResult write(ClusterJobExecuteContext context,
IDataItem<T> dataItem) throws Exception;
/**
* 写数据线程池,没有配置时使用handler的线程池
*
* @return
*/
ThreadPoolExecutor getWriteThreadPool();
}
getCountPerWrite
方法用来设置每次批量 write 的数据量,值小于等于 0 时认为一次只写入一条数据。该服务可以单独设置线程池,当没有设置时默认使用任务 handler 的线程池。
IDataItem
就是待处理的数据对象,有两种类型:
SingleDataItem
:只包含一条数据,当getCountPerWrite
方法的返回值小于或等于 1 时是该类型。MultiDataItem
:包含多条数据,当getCountPerWrite
方法的返回值大于 1 时是该类型。
可根据实际情况对 dataItem
进行类型转换,例如:
switch(dataItem.getType()){
case SINGLE:
((SingleDataItem<Integer>)dataItem).getItem();
break;
case MULTIPLE:
(MultiDataItem<Integer>)dataItem).getItemList();
break;
default:
break;
}
如果需要打开关闭文件,可以使用接口 IStreamWriter
:
public interface IStreamWriter<T> extends IStream,IWriter<T>{
}
该接口继承了 IStream
接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。
代码示例
结合上述应用场景案例,在执行阶段,需要根据拆分的分片从数据库或其他数据源中拉取数据进行处理,然后落库。
Read 阶段:根据分片信息(shardingRule、start、end)到指定表里拉取指定范围的用户数据。
Process 阶段:对拉取到的用户数据进行处理。
Write 阶段:将处理后的数据落库。
具体代码如下:
public class UserProcessHandler implements IClusterJobExecuteHandler {
private final Logger LOGGER = LoggerFactory.getLogger(ClusterJobExecutor.class);
private ThreadPoolExecutor threadPool;
@Override
public void preExecute(ClusterJobExecuteContext context) {
// 前置处理
LOGGER.info(String.format("preExecute chunkId:%s, param:%s",
context.getChunkId(), context.getCustomParams()));
}
@Override
public IReader getReader() {
return new IReader() {
@Override
public LoadData<String> read(ClusterJobExecuteContext context) throws Exception {
IChunkData chunkData = context.getChunk();
RangeChunkData rangeChunkData = (RangeChunkData) chunkData;
// 根据索引去查询数据
String shardingRule = rangeChunkData.getShardingRule();
int start = Integer.parseInt(rangeChunkData.getStart());
int end = Integer.parseInt(rangeChunkData.getEnd());
// 模拟查询数据
List<String> stringList = queryUserInfo(shardingRule, start, end);
// 一次查询,后面没有数据
boolean hasMore = false;
return new LoadData<String>(stringList, hasMore);
}
private List<String> queryUserInfo(String shardingRule, int start, int end) {
List<String> stringList = Lists.newArrayList();
for (int userId = start; userId <= end; userId++) {
stringList.add(shardingRule + "" + userId);
}
return stringList;
}
};
}
@Override
public IProcessor getProcessor() {
return new IProcessor() {
@Override
public DataProcessResult process(ClusterJobExecuteContext context,
Object data) throws Exception {
// 处理数据
System.out.println("process user" + data);
return new DataProcessResult(true, "", data);
}
@Override
public ThreadPoolExecutor getProcessThreadPool() {
return null;
}
};
}
@Override
public IWriter getWriter() {
return new IWriter<String>() {
@Override
public int getCountPerWrite() {
return 1;
}
@Override
public ClientCommonResult write(ClusterJobExecuteContext context,
IDataItem<String> dataItem)throwsException {
// 数据存储
switch (dataItem.getType()) {
case SINGLE:
// 单个数据块,只包含一条数据
SingleDataItem<String> singleDataItem = (SingleDataItem<String>) dataItem;
LOGGER.info(String.format("getWriter write single data:%s", singleDataItem.getItem()));
System.out.println(String.format("write single data:%s", singleDataItem.getItem()));
break;
case MULTIPLE:
// 复合数据块,包含多条数据。比如任务停止时会将多条数据传入
MultiDataItem<String> multiDataItem = (MultiDataItem<String>) dataItem;
LOGGER.info(String.format("getWriter write multi data:%s", multiDataItem.getItemList()));
break;
default:
break;
}
return ClientCommonResult.buildSuccessResult();
}
@Override
public ThreadPoolExecutor getWriteThreadPool() {
return BlockingThreadPool.getThreadPool();
}
};
}
@Override
public ILimiter getLimiter() {
return new DefaultLimiter(10);
}
@Override
public void postExecute(ClusterJobExecuteContext context) {
// 后置处理
LOGGER.info(String.format("JobExecuteHandler postExecute chunkId:%s, param:%s",
context.getChunkId(), context.getCustomParams()));
}
@Override
public Progress calProgress(ClusterJobExecuteContext context) {
return new Progress();
}
@Override
public boolean isProcessAsync() {
return true;
}
@Override
public String getName() {
return "USER_PROCESS_HANDLER";
}
@Override
public ThreadPoolExecutor getThreadPool() {
return this.threadPool;
}
public void setThreadPool(ThreadPoolExecutor threadPool) {
this.threadPool = CommonThreadPool.getThreadPool("REMOTE_EXECUTE");
}
}
执行模式
集群任务的执行阶段支持两种模式:
本机执行
本机执行时需要实现接口 IClusterJobExecuteHandler,如下:
public interface IJobHandler {
/**
* handler的名字
*
* @return
*/
String getName();
/**
* 可以留空, 使用默认执行线程池
*
* @return
*/
ThreadPoolExecutor getThreadPool();
}
public interface IClusterJobExecuteHandler<I, O> extends IJobHandler {
/**
* 预处理
*
* @param context
*/
void preExecute(ClusterJobExecuteContext context);
/**
* 获取数据读取服务
*
* @return
*/
IReader<I> getReader();
/**
* 获取数据清洗服务,返回NULL时会跳过数据清洗步骤,直接处理读取出的数据
*
* @return
*/
IProcessor<I, O> getProcessor();
/**
* @return
*/
IWriter<O> getWriter();
/**
* 获取限流器,返回NULL时使用默认限流器
*
* @return
*/
ILimiter getLimiter();
/**
* 本次执行完的后置处理
*
* @param context
* @return
*/
void postExecute(ClusterJobExecuteContext context);
/**
* 计算处理进度
*
* @param context
* @return
*/
Progress calProgress(ClusterJobExecuteContext context);
/**
* 是否异步处理,开启异步执行后,reader-process-write阶段全部是异步处理
*
* @return
*/
boolean isProcessAsync();
}
实现 IClusterJobExecuteHandler
的类负责处理任务。实现时需要设置:
IReader
IWriter
IProcessor:非必须设置,当没有设置 IProcessor 时会把读取出的数据直接交给 IWriter 服务。
ILimiter:没有设置时使用默认的限流服务。
当 isProcessAsync 返回 true 时,reader-process-write 阶段全部是异步处理,即读取的数据放到队列给 process 或 write 消费后会立即开始下一次读取。当返回 false 时,读取的数据需要等待被 write 消费完后开始下一次读取。
IReader 服务如果配置成了 bean,需要确保服务是无状态的,否则多线程场景下会相互干扰。如果需要存储一些处理过程中的数据,那么在需要在 getReader 方法里 new 一个 IReader 服务实例。IWriter 服务也是如此。
远程执行
远程执行时需要实现接口 IRemoteClusterJobExecuteHandler,如下:
public interface IRemoteClusterJobExecuteHandler<T> extends IClusterJobExecuteHandler<T>{
/**
* 设置路由策略,没有设置时默认为轮询
*
* @return
*/
IClusterRouter getClusterRouter();
/**
* 分发完一次load出来的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位是ms,返回值<=0时认为不休眠
*
* @return
*/
int getSleepAfterPerLoad();
}
该接口继承自 IClusterJobExecuteHandler,需要额外两个方法:getClusterRouter 和 getSleepAfterPerLoad。
当远程处理数据时,客户端会把捞取的数据分发给业务集群内的其它机器进行处理,getClusterRouter 方法用来设置分发时的路由规则,目前支持随机(RandomClusterRouter)和轮询(RoundRobinClusterRouter)两种方式,当方法返回 null 时默认为轮询。
getSleepAfterPerLoad 方法用来设置处理完读取的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位时ms,仅当 isProcessAsync 返回 false 时生效。
集群内的远程 RPC 调用使用 oneway 模式,所以无法获取数据的处理结果,需要业务方自己记录处理结果。如需帮助,请 提交工单 咨询。
可以通过实现接口 IRemoteProcessorExecutor 来设置接收远程分发请求的线程池,将实现类发布成 bean 即可,接口如下:
public interface IRemoteProcessorExecutor{
/**
* 获取线程池
*
* @return
*/
Executor getExecutor();
}
另外,远程调用需要在客户端启动一个 RpcServer:
对于 SOFABoot 项目,在工程的
application.properties
文件中使用配置项:com.alipay.sofa.antscheduler.remote.execute.enable=true com.alipay.sofa.antscheduler.remote.execute.port=xxx
对于非 SOFABoot 项目,需要在初始化客户端的时候设置下面两个参数:
public class Config{ ...... /** * 是否支持远程执行 */ private boolean isEnableRemoteExecute =false; /** * 远程服务端口,没有设置时使用默认端口9989 */ private int remoteServerPort; ...... }
线程池默认配置
IProcessor 和 IWriter 的线程池没有设置时,默认使用 IJobHandler 里设置的线程池。当 IJobHandler 也没有设置时,使用的是客户端默认指定的,配置参数为:
minPoolSize:20
maxPoolSize:300
queueSize:100
keepAliveTime:1小时
当没有设置 IRemoteProcessExecutor 时,默认使用 bolt 协议提供的默认线程池,配置参数为:
minPoolSize:20
maxPoolSize:400
queueSize:600
keepAliveTime:60秒
您也可以对 IProcessor,IWriter,IJobHandler 以及 IRemoteProcessExecutor 自定义设置线程池。
在文档使用中是否遇到以下问题
更多建议
匿名提交