MapReduce model

更新时间:
复制 MD 格式

The MapReduce model is a lightweight, distributed batch processing model developed by SchedulerX. It uses the MapJobProcessor or MapReduceJobProcessor interface to organize connected workers into a distributed computing engine for big data batch processing. Compared to traditional big data batch processing frameworks, such as Hadoop and Spark, the MapReduce model does not require you to import data into a big data platform. This eliminates extra storage and computing costs. It is a low-cost, fast, and easy-to-program model that can process massive amounts of data in seconds.

Notes

  • The size of a single subtask cannot exceed 64 KB.

  • The return value of `result` in the `ProcessResult` method cannot exceed 1000 bytes.

  • If you use the `reduce` method, the results of all subtasks are cached on the master node. This can put significant pressure on the master node's memory. Keep the number of subtasks and the size of the return value small. If you do not require the `reduce` method, use the `MapJobProcessor` interface.

  • SchedulerX does not guarantee that a subtask executes only once. Under certain conditions, a failover may occur, which can cause a subtask to run multiple times. You must implement idempotence in your business logic.

Interfaces

  • You can inherit the MapJobProcessor class.

    Interface

    Description

    Required

    public ProcessResult process(JobContext context) throws Exception;

    The entry point for the business logic of each subtask. Get the taskName from the context to identify the subtask. After the logic is processed, return a ProcessResult.

    Yes

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    Execute the map method to distribute a batch of subtasks across multiple machines. You can execute the map method multiple times. If taskList is empty, the method fails. After execution, return a ProcessResult.

    Yes

    public void kill(JobContext context);

    Killing a task from the frontend triggers this method. You must implement the logic to interrupt the business process.

    No

  • Inherit from the MapReduceJobProcessor class.

    Interface

    Description

    Required

    public ProcessResult process(JobContext context) throws Exception;

    The entry point for the business logic of each subtask. Get the taskName from the context to identify the subtask. After the logic is processed, return a ProcessResult.

    Yes

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    Execute the map method to distribute a batch of subtasks across multiple machines. You can execute the map method multiple times. If taskList is empty, the method fails. After execution, return a ProcessResult.

    Yes

    public ProcessResult reduce(JobContext context);

    After all subtasks on the worker nodes are complete, the reduce method is called back. The master node executes the reduce method. This method is typically used for data aggregation, notifying downstream systems, or passing data between upstream and downstream tasks in a workflow.

    The reduce method can process the results of all subtasks.

    1. A subtask returns a result, such as an order number, using return ProcessResult(true, result).

    2. When the reduce method is executed, get the status of all subtasks using context.getTaskStatuses() and the results using context.getTaskResults(). Then, perform the necessary logical processing.

    Yes

    public void kill(JobContext context);

    Killing a task from the frontend triggers this method. You must implement the logic to interrupt the business process.

    No

    public boolean runReduceIfFail(JobContext context)

    Specifies whether to execute the reduce method if a subtask fails. By default, the reduce method is executed even if a subtask fails.

    No

Procedure

  1. Log on to the Distributed Task Scheduling Platform console. In the navigation pane on the left, click Task Management.

  2. On the Task Management page, click Create Task.

  3. In the Create Task panel, select MapReduce from the Execution Mode drop-down list. Then, configure the parameters in the Advanced Configuration section.

    Configuration item

    Description

    Dispatch Policy

    Note

    Requires client version 1.10.3 or later.

    • Polling Policy (Default): Evenly distributes an equal number of subtasks to each worker. This policy is suitable for scenarios where each subtask takes roughly the same amount of time to process.

    • Optimal Worker Load Policy: The master node automatically detects the load on each worker node. This policy is suitable for scenarios where there are significant differences in processing time between subtasks or worker machines.

    Subtask Concurrency per Machine

    The number of execution threads on a single machine. The default value is 5. To speed up execution, increase this value. If downstream systems or the database cannot handle the load, decrease this value.

    Subtask Failure Retry Attempts

    The number of times to automatically retry a failed subtask. The default value is 0.

    Subtask Failure Retry Interval

    The interval between retry attempts for a failed subtask, in seconds. The default value is 0.

    Subtask Failover Policy

    Note

    Requires client version 1.8.12 or later.

    Specifies whether to redistribute subtasks to other machines if an execution node goes down or offline. If you enable this feature, subtasks may be executed multiple times during a failover. You must ensure your logic is idempotent.

    Master Node Participation

    Note

    Requires client version 1.8.12 or later.

    Specifies whether the master node participates in subtask execution. The number of online, runnable workers must be at least two. If the number of subtasks is very large, disable this parameter.

    Subtask Dispatch Method

    • Push model: Evenly distributes subtasks to each machine.

    • Pull model: Each machine actively pulls subtasks. This avoids bottlenecks and supports dynamic scaling to pull subtasks. During the pull process, all subtasks are cached on the master node, which consumes memory. The number of subtasks should not exceed 10,000.

    For more information about other configuration items, see Advanced configuration parameters for Task Management.

Principle and best practices

SchedulerX 2.0: Distributed Computing Principles and Best Practices

Demo

Process data from a single table (with consecutive IDs)

  1. The root task reads the minimum and maximum IDs.

    select min(id), max(id) from Tab1;
  2. Paginate the data based on the ID range. Each subtask contains two fields: `startId` and `endId`.

  3. Each subtask retrieves data using its ID range.

    select * from Tab1 where id >= startId and id < endId;

The following code provides an example:

class PageTask {
    private long startId;
    private long endId;

    public PageTask(long startId, long endId) {
        this.startId = startId;
        this.endId = endId;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); // The backend code can be the same for multiple jobs. Use job parameters in the console to specify the table name.
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); // Calculate the number of pages.
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

Process data from a single table (with non-consecutive IDs)

  1. Implement a bucketing policy for the database. Add a `bucket` field and create an index for it.

  2. For example, if you use 1024 buckets, you can hash the order number or ID using an expression such as `order_number % 1024` when a new record is added. Then, store the result in the `bucket` field.

  3. The data is distributed almost evenly across the buckets. You can retrieve all records for a specific bucket using the following SQL statement:

    select * from Tab1 where bucket=xxx;

The following code provides an example:

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); // The backend code can be the same for multiple jobs. Use job parameters in the console to specify the table name.
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            List<Integer> tasks = Lists.newArrayList();
            for (int i = 0; i< 1024; i++) {
                tasks.add(i);
            }    
            return map(tasks, "BucketTask");
        } else if (taskName.equals("BucketTask")) {
            int bucketId = (int)task;
            List<Record> records = queryRecord(tableName, bucketId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<Record> queryRecord(String tableName, int bucketId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from #{tableName} where bucket= #{bucketId}
        return records;
    }

}

Process sharded data

class PageTask {
    private String tableName;
    private long startId;
    private long endId;

    public PageTask(String tableName, long startId, long endId) {
        this.tableName = tableName;
        this.startId = startId;
        this.endId = endId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            // First, shard the database.
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            // Then, shard the tables based on the sharded database.
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            // If a sharded table is also large, paginate it.
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); // Calculate the number of pages.
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(tableName, minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO: Return the list of sharded databases.
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        //TODO: Return the list of sharded tables.
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

Process 50 messages and return a result from reduce

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=50;
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

Process 50 messages and have reduce aggregate the subtask results

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}