Visual MapReduce model

更新时间:
复制 MD 格式

The visual MapReduce model enhances the standard MapReduce model by adding visualization and operational capabilities. Without changing your backend code, you can switch the execution mode to the visual MapReduce model in the SchedulerX console. This adds a subtask list page where you can view the details, results, and logs for each subtask. You can also rerun individual subtasks.

Notes

  • Older SDK versions have security vulnerabilities. Upgrade your SDK to v1.12.2 or later.

  • This feature is available only in the Professional Edition.

  • The number of subtasks cannot exceed 1,000.

  • A task cannot exceed 64 KB in size.

  • To display custom labels for subtasks, the subtask objects must implement a specific interface.

  • The return value of the result field of ProcessResult cannot exceed 1,000 bytes in size.

  • If you use the reduce method, the results of all tasks are cached on the master node. In this case, high memory pressure is caused on the master node. We recommend that you specify a small number of tasks and a small return value for the result field. If the reduce method is not required, you can directly call the MapJobProcessor interface.

  • If a failover is triggered, SchedulerX may run a task more than once. In this case, you must implement the idempotence of tasks on your own.

Interfaces

  • The visual MapReduce model inherits all interfaces from the standard MapReduce model, and the development process for task-handling code is identical. For more information, see MapReduce model.

  • (Optional) In addition to the standard MapReduce model interfaces, you can set a label for each subtask. To do this, the subtask object must implement the com.alibaba.schedulerx.worker.processor.BizSubTask interface.

    Interface

    Description

    Required

    public Map<String, String> labelMap()

    Implement this method to return subtask label information. These labels are used to display business-specific attributes for each subtask object, such as an account name, product code, or city and region.

    No

Comparison with the MapReduce model

Item

MapReduce

Visual MapReduce

Number of subtasks

Supports millions

Up to 1,000

Task development model

Identical

Subtask list

Not supported

Supported.

Subtask execution details

Not supported

Supported. Provides execution records, status, logs, distributed traces, and thread dumps for each subtask.

Subtask labels

Not supported

Supported. You can view business labels by implementing the BizSubTask interface in subtasks.

Subtask operations

Not supported

Supported. You can stop or rerun individual subtasks.

Example: Task development

Batch processing of accounts

This example shows how to batch-process a list of bank accounts. Each account is handled as an independent subtask that runs in parallel across the cluster. Each subtask in the execution list displays its corresponding account information. This lets you easily track the processing status and execution details of each account. The following sections provide sample code.

  1. Define a custom subtask object for account information. To display a label for each subtask object, implement the com.alibaba.schedulerx.worker.processor.BizSubTask interface and its labelMap method.

    Code

    public class ParallelAccountInfo implements BizSubTask {
    
        /**
         * Primary key
         */
        private long id;
    
        private String name;
    
        private String accountId;
    
        public ParallelAccountInfo(long id, String name, String accountId) {
            this.id = id;
            this.name = name;
            this.accountId = accountId;
        }
    
        /**
         * Implement the labelMap method to set the label for the subtask.
         * @return
         */
        @Override
        public Map<String, String> labelMap() {
            Map<String, String> labelMap = new HashMap();
            labelMap.put("Account Name", name);
            return labelMap;
        }
    }

    After the subtask object implements the interface, the subtask list can display a unique label for each object, such as the account name in this example. This lets you distinguish the processing status of each account. You can also search for subtasks by label.

    image

  2. Create a processor for the account business logic. This processor handles the logic for a single account and inherits from com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor.

    Code

    public class ParallelJob extends MapReduceJobProcessor {
    
        private static final Logger logger = LoggerFactory.getLogger("schedulerx");
    
        @Override
        public ProcessResult reduce(JobContext context) throws Exception {
            return new ProcessResult(true);
        }
    
        @Override
        public ProcessResult process(JobContext context) throws Exception {
            if(isRootTask(context)){
                logger.info("Building a list of subtasks for parallel processing...");
                List<ParallelAccountInfo> list = new LinkedList();
                /**
                 * If it is the root task, build a list of subtask objects for parallel processing.
                 * In a real-world scenario, you can load subtask objects based on your business needs. These objects must implement the BizSubTask interface.
                 * Example use cases:
                 *  1. Load unprocessed customer accounts from a database.
                 *  2. Create a list of provinces, cities, and regions to distribute tasks by area.
                 *  3. Classify subtasks based on business labels, such as 'electronics', 'daily goods', or 'food'.
                 *  4. Classify subtasks by time, such as by month (January, February, etc.).
                 */
                for(int i=0; i < 20; i++){
                    list.add(new ParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
                            "AC"+StringUtils.leftPad(i+"", 12, "0")));
                }
                return map(list, "transfer");
            }else {
                /**
                 * For non-root tasks, you can retrieve the corresponding subtask object and perform business processing.
                 */
                ParallelAccountInfo obj = (ParallelAccountInfo)context.getTask();
                // Process the business logic for the retrieved subtask object.
                // do something
                logger.info("Processing subtask: {}", JSON.toJSONString(obj));
                return new ProcessResult(true);
            }
        }
    }

    After you develop and deploy the task, configure and run it as a scheduled task in the console. For more information, see Procedure.

Procedure

Configure the task

  1. Log on to the SchedulerX console. In the left-side navigation pane, click task management.

  2. On the task management page, click Create Task.

  3. In the Create Task panel, select Visual MapReduce from the Execution Mode drop-down list.

    image

  4. In the advanced configuration section, set the following parameters. For information about other parameters, see Advanced parameters for task management.

    Parameter

    Description

    Distribution policy

    Polling Scheme (Default): Distributes an equal number of subtasks to each worker. This strategy is suitable for scenarios where subtasks have similar processing times.

    Worker load optimal strategy: The master node automatically detects the load on each worker and distributes subtasks accordingly. This strategy is suitable for scenarios where subtasks and workers have significantly different processing times.

    Note

    Requires client version 1.10.14 or later.

    Subtask concurrency per worker

    The number of execution threads per worker. The default is 5. You can increase this value to speed up execution. If downstream systems or databases cannot handle the load, you can decrease this value.

    Subtask retry attempts

    The number of times a subtask is automatically retried after a failure. The default is 0.

    Subtask retry interval

    The interval between retry attempts for a failed subtask, in seconds. The default is 0.

    Subtask failover strategy

    Specifies whether to redistribute subtasks to other machines if an execution node crashes or goes offline. If you enable this option, subtasks may be executed more than once during a failover. You must ensure that your business logic is idempotent.

    Note

    Requires client version 1.8.13 or later.

    Master participation

    Specifies whether the master node participates in subtask execution. This requires at least two online workers. Disable this option when the number of subtasks is very large.

    Note

    Requires client version 1.8.13 or later.

Monitor and debug subtasks

After a task is executed, go to the Execution List page and click Details to view detailed information about its subtasks.

  • On the subtask list tab, you can view the processing status of each subtask.

    image.png

  • On the subtask list tab, find the target subtask and click Log in the Operation column to view its business logs and analyze the execution results.

    image.png

  • While a task is running, go to the Current execution details tab and click thread dump to inspect the running threads on the corresponding machine. This helps analyze exceptions in the current task.

    image.png

  • On the subtask list tab, if you have integrated Tracing Analysis, you can click a TraceId to query the distributed trace for that subtask. For more information, see Integrate Tracing Analysis.

    image

Related documentation