Broadcast execution model

更新时间:
复制 MD 格式

When multiple workers need to run the same job simultaneously -- clearing caches across a cluster, running scripts, or collecting status from every node -- the broadcast model dispatches the job to all workers in a group and runs them in parallel. A job is completed only after all workers complete the job.

How it works

A broadcast job follows a three-phase lifecycle:

1. preProcess (each worker, once)  -->  2. process (all workers, in parallel)  -->  3. postProcess (each worker, once)
  1. Pre-processing -- Each worker runs preProcess once for setup tasks such as initializing caches or database connections.

  2. Processing -- Every worker in the group runs process in parallel. Each worker receives a unique shard index through context.getShardingId().

  3. Post-processing -- After all workers complete process, each worker runs postProcess once to aggregate results. The output of postProcess can feed into downstream workflows.

preProcess and postProcess are optional and only available for Java jobs.

Failure behavior

If any worker fails during process, the entire job is marked as failed.

Use cases

Batch O&M operations

  • Run a script on all workers at a scheduled time.

  • Clear the cache on all workers at a scheduled time.

  • Dynamically enable a service on all workers, use a worker to collect execution results, and then update databases.

Data aggregation

  • Initialize Redis caches or databases during the preProcess process if the JavaProcessor is used.

  • Collect the execution result based on your business logic after a worker calls the process method.

  • Aggregate the execution results from all workers and update caches or databases after the workers call the postProcess method.

Supported job types

The broadcast model supports multiple types of jobs, such as script jobs and Java jobs.

Java jobs provide lifecycle hooks (preProcess and postProcess) in addition to the core process method. Java jobs require JavaProcessor 1.0.7 or later.

Java interface

MethodRequiredSignatureDescription
processYespublic ProcessResult process(JobContext context) throws ExceptionCore execution logic. Runs on every worker in parallel.
preProcessNopublic void preProcess(JobContext context)Setup logic. Runs once on each worker before process.
postProcessNopublic ProcessResult postProcess(JobContext context)Aggregation logic. Runs once on each worker after all workers complete process. Output can feed into downstream workflows.

Create a Java broadcast job

This example generates a random value on each worker, then aggregates the results in postProcess.

@Component
public class TestBroadcastJob extends JavaProcessor {

    /**
     * Only one worker calls the method.
     */
    @Override
    public void preProcess(JobContext context) {
        System.out.println("TestBroadcastJob.preProcess");
    }

    /**
     * All workers call the method.
     */
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int value = new Random().nextInt(10);
        System.out.println("Total number of shards=" + context.getShardingNum()
                + ", Shard index=" + context.getShardingId()
                + ", taskId=" + context.getTaskId()
                + ", value=" + value);
        return new ProcessResult(true, String.valueOf(value));
    }

    /**
     * Only one worker calls the method.
     */
    @Override
    public ProcessResult postProcess(JobContext context) {
        System.out.println("TestBroadcastJob.postProcess");
        Map<Long, String> allTaskResults = context.getTaskResults();
        Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
        int num = 0;
        for (Entry<Long, String> entry : allTaskResults.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                num += Integer.valueOf(entry.getValue());
            }
        }
        System.out.println("TestBroadcastJob.postProcess(), num=" + num);
        return new ProcessResult(true, String.valueOf(num));
    }

}

How this example works:

  • process returns a ProcessResult containing a status flag and a string value. postProcess retrieves these values through context.getTaskResults().

  • postProcess checks TaskStatus.SUCCESS before aggregating, so failed workers are excluded from the total.

  • The final ProcessResult from postProcess can be passed to a downstream workflow.