Golang任务

更新时间:

Golang应用依赖Go版本SDK,即可接入SchedulerX,定时调度您的方法。由于Go使用协程运行任务,所以不支持停止任务。

任务类型

单机任务

编写业务代码,实现Processor接口。

type Processor interface {
    Process(ctx *processor.JobContext) (*ProcessResult, error)
}

示例:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"time"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Start process my task: Hello world!")
	// mock execute task
	time.Sleep(3 * time.Second)
	ret := new(processor.ProcessResult)
	ret.SetStatus(processor.InstanceStatusSucceed)
	fmt.Println("[Process] End process my task: Hello world!")
	return ret, nil
}

广播任务

支持Java版本广播分片任务,支持接口如下:

  • PreProcess:所有worker节点执行Process之前,由master节点执行一次PreProcess。

  • Process:所有worker节点执行Process,可以返回结果。

  • PostProcess:所有worker节点执行Process结束后,由master节点执行一次PostProcess,可以获取所有节点Process的结果。

说明

需要Golang SDK版本为v0.0.2及以上。

示例:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
	"math/rand"
	"strconv"
)

type TestBroadcast struct{}

// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	value := rand.Intn(10)
	fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(value))
	return ret, nil
}

// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
	fmt.Println("TestBroadcastJob PreProcess")
	return nil
}

// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("TestBroadcastJob PostProcess")
	allTaskResults := ctx.TaskResults()
	allTaskStatuses := ctx.TaskStatuses()
	num := 0

	for key, val := range allTaskResults {
		fmt.Printf("%v:%v\n", key, val)
		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
			valInt, _ := strconv.Atoi(val)
			num += valInt
		}
	}

	fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(num))
	return ret, nil
}

MapReduce任务

支持Java版本的Map模型和MapReduce模型。

说明

需要Golang SDK版本为v0.0.4及以上。

Map任务Demo

  1. 编写业务代码,实现MapJobProcessor

    package main
    
    import (
    	"encoding/json"
    	"errors"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"strconv"
    	"time"
    )
    
    type TestMapJob struct {
    	*mapjob.MapJobProcessor
    }
    
    func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process the MapReduce model is used to distributed scan orders for timeout confirmation
    func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 10
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("start root task")
    		var messageList []interface{}
    		for i := 1; i <= num; i++ {
    			var str = fmt.Sprintf("id_%d", i)
    			messageList = append(messageList, str)
    		}
    		fmt.Println(messageList)
    		return mr.Map(jobCtx, messageList, "Level1Dispatch")
    	} else if taskName == "Level1Dispatch" {
    		var task []byte = jobCtx.Task()
    		var str string
    		err = json.Unmarshal(task, &str)
    		fmt.Printf("str=%s\n", str)
    		time.Sleep(100 * time.Millisecond)
    		fmt.Println("Finish Process...")
    		if str == "id_5" {
    			return processor.NewProcessResult(
    				processor.WithFailed(),
    				processor.WithResult(str),
    			), errors.New("test")
    		}
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(str),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
  2. 在Client中注册Map任务。

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// This is just an example, the real configuration needs to be obtained from the platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    	task := &TestMapJob{
    		mapjob.NewMapJobProcessor(),
    	}
    
    	client.RegisterTask("TestMapJob", task)
    	select {}
    }
    

MapReduce任务Demo

  1. 编写业务代码,实现MapReduceJobProcessor

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    	"strconv"
    	"time"
    )
    
    type OrderInfo struct {
    	Id    string `json:"id"`
    	Value int    `json:"value"`
    }
    
    func NewOrderInfo(id string, value int) *OrderInfo {
    	return &OrderInfo{Id: id, Value: value}
    }
    
    type TestMapReduceJob struct {
    	*mapjob.MapReduceJobProcessor
    }
    
    func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process the MapReduce model is used to distributed scan orders for timeout confirmation
    func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 1000
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("start root task, taskId=%d", jobCtx.TaskId())
    		var orderInfos []interface{}
    		for i := 1; i <= num; i++ {
    			orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
    		}
    		return mr.Map(jobCtx, orderInfos, "OrderInfo")
    	} else if taskName == "OrderInfo" {
    		orderInfo := new(OrderInfo)
    		if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil {
    			fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task())
    		}
    		fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo)
    		time.Sleep(1 * time.Millisecond)
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(strconv.Itoa(orderInfo.Value)),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
    func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	allTaskResults := jobCtx.TaskResults()
    	allTaskStatuses := jobCtx.TaskStatuses()
    	count := 0
    	fmt.Printf("reduce: all task count=%d\n", len(allTaskResults))
    	for key, val := range allTaskResults {
    		if key == 0 {
    			continue
    		}
    		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
    			num, err := strconv.Atoi(val)
    			if err != nil {
    				return nil, err
    			}
    			count += num
    		}
    	}
    	fmt.Printf("reduce: succeed task count=%d\n", count)
    	return processor.NewProcessResult(
    		processor.WithSucceed(),
    		processor.WithResult(strconv.Itoa(count)),
    	), nil
    }
  1. 在Client中注册MapReduce任务。

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// This is just an example, the real configuration needs to be obtained from the platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    
    	task := &TestMapReduceJob{
    		mapjob.NewMapReduceJobProcessor(),
    	}
        
    	client.RegisterTask("TestMapReduceJob", task)
    	select {}
    }
    

如何停止任务

Go processor通过协程运行任务,无法直接停止协程的执行。为此,我们提供KillProcessor接口,用户可自行实现Kill方法停止Processor。

说明

需要Golang SDK版本为v1.0.2及以上。

示例:

package main

import (
	"fmt"
	"time"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

var Stop = false

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Start process my task: Hello world!")
	// mock execute task
	for i := 0; i < 10; i++ {
		fmt.Printf("Hello%d\n", i)
		time.Sleep(2 * time.Second)
		if Stop {
			break
		}
	}
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	fmt.Println("[Process] End process my task: Hello world!")
	return ret, nil
}

func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
	fmt.Println("[Kill] Start kill my task: Hello world!")
	Stop = true
	return nil
}

相关文档