DataWorks的开放平台为您提供OpenEvent、OpenAPI等开放能力,您可通过开放平台将第三方调度系统集成到DataWorks的调度系列中,将三方调度系统的任务嵌入DataWorks的业务流程中。本文以一个示例为您介绍集成第三方调度系统时需要进行的配置要点。

背景信息

当您的数据处理主流程在DataWorks中,且需要集成一个其他调度系统的调度任务时,您可以使用DataWorks的开放平台和HTTP触发节点。如下图所示。集成三方系统集成三方调度系统后,整体的任务运行流程如下。
  • 三方调度系统可通过DataWorks的OpenEvent功能,订阅依赖的DataWorks节点的状态,当依赖的节点运行完成后,即可开始运行三方调度系统中的任务。
  • 当三方系统中的任务运行完成后,即可通过DataWorks的RunTriggerNode这个API触发运行DataWorks的HTTP触发节点,通过HTTP触发节点触发下游的DataWorks节点开始运行。
其中使用的DataWorks关键功能与概念包括:
下文以一个示例为您介绍要实现上述业务需求需进行的主要操作流程。

DataWorks配置:开启并配置消息订阅(OpenEvent)

开启并配置消息订阅的详细步骤请参见开启消息订阅,以下为本实践中的核心配置流程与注意事项。

  1. EvenBridge控制台,跳过事件源等配置,快速创建一个自定义总线。创建自定义总线
  2. EvenBridge控制台对应的事件总线中,创建事件规则。
    本实践定义该EventBridge自定义总线可接收DataWorks实例状态变更消息,配置demo与核心参数配置如下。
    1. 配置事件模式事件规则3
      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:InstanceStatusChanges:InstanceStatusChanges"
          ]
      }
      • source:定义事件的产品名称标识,配置为acs.dataworks
      • type:定义产品下事件的类型标识,配置为dataworks:InstanceStatusChanges:InstanceStatusChanges。您可以在下方的事件模式调试中,将source、type取值进行补充修改,然后进行事件测试,测试成功后单击下一步测试3
    2. 配置事件目标中,服务类型选择为HTTPS,并填写合适的URL,其他参数可保持默认。事件目标
  3. DataWorks控制台的开放平台页面,启用上述消息分发通道。启用

三方系统配置:开发触发三方任务运行的逻辑

完成订阅需要依赖的DataWorks节点后,您需要配置三方调度系统,根据DataWorks实例状态触发运行任务,当订阅到依赖的DataWorks节点已运行成功,即开始运行三方系统的任务。配置的示例代码如下。
package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/event")
public class ExtensionsController {

    /**
     * 接收eventBridge推送过来的消息
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
            JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
            //调度任务实例开始等时间的具体时间
            System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
            //DagId
            System.out.println("dagId: "+ dataParam.getString("dagId"));
            //Dag的类型,取值如下:
            //0:周期调度任务
            //1:手动任务
            //2:冒烟测试
            //3:补数据
            //4:手动业务流程
            //5:临时业务流程
            System.out.println("dagType: "+dataParam.getString("dagType"));
            //任务实例的调度类型,取值如下:
            //NORMAL(0):正常调度任务。该任务被日常调度。
            //MANUAL(1):手动任务。该任务不会被日常调度。
            //PAUSE(2):冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
            //SKIP(3):空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
            //SKIP_UNCHOOSE(4):临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
            //SKIP_CYCLE(5):未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
            //CONDITION_UNCHOOSE(6):上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
            //REALTIME_DEPRECATED(7):实时生成的已经过期的周期实例,该类型的任务直接被置为成功状态。
            System.out.println("taskType: "+dataParam.getString("taskType"));
            //任务实例的修改时间
            System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
            //任务实例的创建时间
            System.out.println("createTime: "+dataParam.getString("createTime"));
            //工作空间的ID。您可以调用ListProjects查看空间ID信息。
            System.out.println("appId: "+dataParam.getString("appId"));
            //调度任务实例所在工作空间的租户ID
            System.out.println("tenantId: "+dataParam.getString("tenantId"));
            //调度任务实例的操作码:该字段可忽略
            System.out.println("opCode: "+dataParam.getString("opCode"));
            //业务流程的ID,周期调度任务实例的业务流程默认为1,手动业务流程和内部工作流调度任务实例为实际的业务流程ID
            System.out.println("flowId: "+dataParam.getString("flowId"));
            //调度任务实例对应的节点ID
            System.out.println("nodeId:"+dataParam.getString("nodeId"));
            //调度任务实例开始等资源的具体时间
            System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
            //调度任务实例ID
            System.out.println("taskId: "+dataParam.getString("taskId"));
            //任务的状态,取值如下:
            //0(未运行)
            //2(等待定时时间dueTime或cycleTime到来)
            //3(等待资源)
            //4(运行中)
            //7(下发给数据质量进行数据校检)
            //8(正在进行分支条件校检)
            //5(执行失败)
            //6(执行成功)
            System.out.println("status: "+dataParam.getString("status"));
            // 订阅到DataWorks的节点完成事件后触发本地调度节点运行
            localScheduleEventService.triggerLocalNode(dataParam);
        }else{
            System.out.println("未能过滤其他事件,请检查配置步骤");
        }
    }
}
                

DataWorks配置:创建HTTP触发节点

三方系统任务运行成功后,需要通过DataWorks的RunTriggerNode触发DataWorks的HTTP触发节点开始运行,通过HTTP触发节点进一步触发下游节点运行,因此您需要根据实际业务需要创建HTTP触发节点。

HTTP触发节点的详细介绍与新建步骤请参见HTTP触发器节点

三方系统配置:开发触发HTTP触发节点运行的逻辑

三方系统通过RunTriggerNode触发HTTP触发节点运行的代码示例如下。
  • HttpTriggerNodeService代码实现demo。
    package com.aliyun.dataworks.services;
    
    import com.aliyuncs.dataworks_public.model.v20200518.RunTriggerNodeRequest;
    import com.aliyuncs.dataworks_public.model.v20200518.RunTriggerNodeResponse;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @author dataworks demo
     */
    @Service
    public class HttpTriggerNodeService {
    
        @Autowired
        private DataWorksOpenApiClient dataWorksOpenApiClient;
    
    
        /**
         * @return
         */
        public boolean triggerNode(Long appId, Long nodeId, Long bizDate, Long cycleTime) {
            try {
                RunTriggerNodeRequest runTriggerNodeRequest = new RunTriggerNodeRequest();
    
                // 设置NodeId,表示触发式节点的节点ID,节点ID可通过ListNodes API查询获取
                runTriggerNodeRequest.setNodeId(nodeId);
    
                // 设置CycleTime,表示触发式节点的任务的运行时间戳,需将HTTP触发节点的调度配置中,节点指定的运行时间,换算为时间戳
                // 如果HTTP触发节点所在的地域与调度系统所在的地域不在同个时区,存在时差,这里需配置为触发节点所在时区的时间。
                // 例如,HTTP触发节点在北京地域且Cyctime为北京时间18:00,而调度系统在美西地域,此时调度系统配置时,需配置为北京时间18:00的时间戳。
                runTriggerNodeRequest.setCycleTime(cycleTime);
    
    
                // 设置BizDate,表示触发式节点实例所在的业务日期时间戳,需将业务日期换算为时间戳。
                // 业务日期为运行时间的前一天,且时间精确到日,时分秒均为00000000。以运行日期为2020年11月25日为例,业务时间为2020112400000000,需将这个时间换算为业务日期的时间戳
                // 如果HTTP触发节点所在的地域与调度系统所在的地域不在同个时区,存在时差,这里需配置为触发节点所在时区的时间。
                runTriggerNodeRequest.setBizDate(bizDate);
    
                // 设置AppId(appId=projectId),表示触发式节点所属的DataWorks工作空间ID,通过GetNode可以查询到节点对应的projectId
                runTriggerNodeRequest.setAppId(appId);
    
                RunTriggerNodeResponse runTriggerNodeResponse = dataWorksOpenApiClient.createClient().getAcsResponse(runTriggerNodeRequest);
                System.out.println(runTriggerNodeResponse.getRequestId());
                return runTriggerNodeResponse.getSuccess();
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                e.printStackTrace();
                // 请求ID
                System.out.println(e.getRequestId());
                // 错误码
                System.out.println(e.getErrCode());
                // 错误信息
                System.out.println(e.getErrMsg());
            }
            return false;
        }
    }
                            
  • 第三方调度的状态机运行demo。
        @Scheduled(cron = "0 0/30 * * * ? ")
        public void schedule() {
            //TODO 模拟本地调度通过HTTP触发器节点调起DataWorks的周期节点
            triggerDwScheduleNode();
        }
  • triggerDwScheduleNode方法的实现demo
    /**
         * 这里以触发器节点为日调度频率为例,介绍如何获取调起触发器节点的相关参数信息
         */
        public void triggerDwScheduleNode() {
            Date gmtDate = getTimeByZeroEnd(new Date());
            GregorianCalendar gc = (GregorianCalendar) GregorianCalendar.getInstance();
            gc.setTime(gmtDate);
            gc.add(Calendar.DATE, -1);
            Long bizDate = gc.getTimeInMillis();
            GetNodeResponse.Data node = nodeService.getNode(nodeId);
            if (node != null) {
                String bizDateStr = getTimeInExpress(gc.getTime(), "yyyy-MM-dd HH:mm:ss");
                ListInstancesResponse.Data instances = nodeService.getInstance(nodeId, node.getProjectId(), bizDateStr);
                if (!CollectionUtils.isEmpty(instances.getInstances())) {
                    ListInstancesResponse.Data.Instance instance = instances.getInstances().get(0);
                    httpTriggerNodeService.triggerNode(node.getProjectId(), node.getNodeId(), bizDate, instance.getCycTime());
                }
            }
    
        }

本地部署运行

下载工程:下载工程后,进入工程根目录下执行:
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
获得可直接运行的jar后执行:
java -jar target/schedule-integration-demo-1.0.jar
完成后,在浏览器输入http://localhost:8080/index会得到"hello world!",表示应用成功部署。