通过OpenAPI创建、修改、删除离线同步任务

本文为您介绍如何使用OpenAPI创建、修改、删除数据集成同步任务,同步来源端数据至去向端。

前提条件

使用限制

  • DataWorks当前仅支持使用OpenAPI创建数据集成离线同步任务。
  • 调用CreateDISyncTask创建数据集成同步任务,仅支持使用脚本模式配置同步任务内容,详情请参见通过脚本模式配置离线同步任务
  • DataWorks暂不支持使用OpenAPI创建业务流程,您需要使用现有的业务流程创建数据同步任务。

配置环境依赖及账号认证

  • 配置Maven依赖。
    1. 打开Maven项目下的pom.xml文件,添加aliyun-java-sdk-core
      <dependency>
       <groupId>com.aliyun</groupId>
       <artifactId>aliyun-java-sdk-core</artifactId>
       <version>4.5.20</version>
      </dependency>
    2. 打开Maven项目下的pom.xml文件,添加aliyun-java-sdk-dataworks-public
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>3.3.18</version>
      </dependency>
    3. 打开Maven项目,添加credentials-java(推荐使用Credentials最新版本)。
      <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>credentials-java</artifactId>
          <version>0.2.11</version>
      </dependency>
      说明 通过阿里云Credentials工具托管AK,关于Credentials工具使用说明,详情请参见:身份验证配置
  • 客户端认证。
    使用OpenAPI创建数据同步任务前,需要调用如下语句对登录阿里云的账号相关信息进行认证。如果阿里云的账号信息认证通过,则继续执行后续任务,如果认证不通过,则该调用会报错,您需要根据实际报错处理相关问题。
    // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://help.aliyun.com/document_detail/378657.html
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

配置流程

完成上述配置环境依赖及账号认证后,您可以通过OpenAPI调用相关接口,创建数据同步任务,同步来源端数据至去向端。配置流程如下:
  1. 创建数据集成任务。
  2. 配置任务的调度依赖。
  3. 提交数据集成任务。
  4. 发布同步任务至生产环境。

配置步骤

  1. 创建数据集成任务。
    调用CreateDISyncTask接口,创建数据集成任务。如下代码仅示例部分参数的配置,更多参数详情请参见CreateDISyncTask
    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"same表comment\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"same表comment\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"业务流程/new_biz/数据集成\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String regionId = "cn-hangzhou";
            // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://help.aliyun.com/document_detail/378657.html
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. 调用UpdateFile接口,更新调度参数。
    请求参数配置详情如下表所示:
    名称类型是否必选示例值描述
    ActionStringUpdateFile

    要执行的操作。

    FileFolderPathString业务流程/第一个业务流程/数据集成/文件夹1/文件夹2

    文件所在的路径。

    ProjectIdLong10000

    DataWorks工作空间的ID。您可以登录DataWorks控制台,进入工作空间管理页面获取ID。

    FileNameStringods_user_info_d

    文件的名称。您可以通过重新设置FileName的值来修改文件名称。

    例如,使用ListFiles接口查询目标目录下的文件ID,通过UpdateFile接口,输入查询的文件ID至FileId参数,并配置FileName的参数值,即可修改相应文件的名称。

    FileDescriptionString这里是文件描述

    文件的描述。

    ContentStringSELECT "1";

    文件代码内容,不同代码类型(fileType)的文件,代码格式不同。您可以在运维中心,右键单击对应类型的任务,选择查看代码,查看具体的代码格式。

    AutoRerunTimesInteger3

    文件出错后,自动重跑的次数。

    AutoRerunIntervalMillisInteger120000

    出错自动重跑时间间隔,单位为毫秒。最大为1800000毫秒(30分钟)。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>出错自动重跑”的”重跑间隔“配置对应。

    控制台中“重跑间隔”的时间单位为分钟,请在调用时注意转换时间。

    RerunModeStringALL_ALLOWED

    重跑属性。取值如下:

    • ALL_ALLOWED:运行成功或失败后皆可重跑。
    • FAILURE_ALLOWED:运行成功后不可重跑,运行失败后可以重跑。
    • ALL_DENIED:运行成功或失败皆不可重跑。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>重跑属性”配置内容对应。

    StopBooleanfalse

    是否暂停调度,取值如下:

    • true:暂停调度。
    • false:不暂停调度。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>调度类型”配置为”暂停调度“时对应。

    ParaValueStringx=a y=b z=c

    调度参数。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>参数”对应。您可以参考调度参数配置。

    StartEffectDateLong936923400000

    开始自动调度的毫秒时间戳。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>生效日期”配置的开始时间的毫秒时间戳对应。

    EndEffectDateLong4155787800000

    停止自动调度的时间戳,单位为毫秒。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>生效日期”配置的结束时间的毫秒时间戳对应。

    CronExpressString00 00-59/5 1-23 * * ?

    周期调度的cron表达式,该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>cron表达式”对应。配置完“调度周期”及“定时调度时间”后,DataWorks会自动生成相应cron表达式。

    示例如下:

    • 每天凌晨5点30分定时调度:00 30 05 * * ?
    • 每个小时的第15分钟定时调度:00 15 * * * ?
    • 每隔十分钟调度一次:00 00/10 * * * ?
    • 每天8点到17点,每隔十分钟调度一次:00 00-59/10 8-23 * * * ?
    • 每月的1日0点20分自动调度:00 20 00 1 * ?
    • 从1月1日0点10分开始,每过3个月调度一次:00 10 00 1 1-12/3 ?
    • 每周二、周五的0点5分自动调度:00 05 00 * * 2,5

    由于DataWorks调度系统的规则,cron表达式有以下限制:

    • 最短调度间隔时间为5分钟。
    • 每天最早调度时间为0点5分。
    CycleTypeStringNOT_DAY

    调度周期的类型,包括NOT_DAY(分钟、小时)和DAY(日、周、月)。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>时间属性>调度周期”对应。

    DependentTypeStringUSER_DEFINE

    依赖上一周期的方式。取值如下:

    • SELF:依赖项选择本节点。
    • CHILD:依赖项选择一级子节点。
    • USER_DEFINE:依赖项选择其他节点。
    • NONE:未选择依赖项,即不会依赖上一周期。
    DependentNodeIdListString5,10,15,20

    当DependentType参数配置为USER_DEFINE时,用于设置当前文件具体依赖的节点ID。依赖多个节点时,使用英文逗号(,)分隔。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>调度依赖”配置为“上一周期”后,依赖项选择”其他节点“时配置的内容对应。

    InputListStringproject_root,project.file1,project.001_out

    文件依赖的上游文件的输出名称。依赖多个输出时,使用英文逗号(,)分隔。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>调度依赖“选择”同周期“时的”父节点输出名称”对应。

    ProjectIdentifierStringdw_project

    DataWorks工作空间的名称。您可以登录DataWorks控制台,进入工作空间配置页面获取工作空间名称。

    该参数与ProjectId参数,二者必须设置其一,用来确定本次API调用操作的DataWorks工作空间。

    FileIdLong100000001

    文件的ID。您可以调用ListFiles接口获取文件ID。

    OutputListStringdw_project.ods_user_info_d

    文件的输出。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>调度依赖“选择”同周期“时的”本节点的输出名称”对应。

    ResourceGroupIdentifierStringdefault_group

    文件发布成任务后,任务执行时对应的资源组。您可以调用ListResourceGroups获取工作空间可用的资源组列表。

    ConnectionNameStringodps_first

    文件对应任务执行时,任务使用的数据源标识符。您可以调用ListDataSources获取可用的数据源列表。

    OwnerString18023848927592

    文件所有者的用户ID。

    AutoParsingBooleantrue

    文件是否开启自动解析功能。取值如下:

    • true:文件会自动解析代码。
    • false:文件不会自动解析代码。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>调度依赖”选择“同周期”时的“代码解析”对应。

    SchedulerTypeStringNORMAL

    调度的类型,取值如下:

    • NORMAL:正常调度任务。
    • MANUAL:手动任务,不会被日常调度,对应手动业务流程下的节点。
    • PAUSE:暂停任务。
    • SKIP:空跑任务,被日常调度,但启动调度时直接被置为成功。
    AdvancedSettingsString{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}

    任务的高级配置。

    该参数与DataWorks控制台中,EMR Spark Streaming和EMR Streaming SQL数据开发任务,编辑页面右侧导航栏的“高级设置“对应。

    当前仅EMR Spark Streaming和EMR Streaming SQL任务支持配置该参数,并且参数为JSON格式。

    StartImmediatelyBooleantrue

    发布后是否立即启动。取值如下:

    • true:发布后立即启动。
    • false:发布后暂不启动。

    该参数与DataWorks控制台中,EMR Spark Streaming和EMR Streaming SQL数据开发任务,编辑页面右侧导航栏的“配置>时间属性>启动方式“对应。

    InputParametersString[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]

    节点的上下文输入参数。参数为JSON格式,包含的字段可参考GetFile接口返回值中的InputContextParameterList参数结构。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>节点上下文>本节点输入参数“对应。

    OutputParametersString[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]

    节点的上下文输出参数。参数为JSON格式,包含的字段可参考GetFile接口返回值中的OutputContextParameterList参数结构。

    该参数与DataWorks控制台中,数据开发任务的“调度配置>节点上下文>本节点输出参数“对应。

  3. 提交数据集成任务。
    调用SubmitFile接口,提交数据集成任务至调度系统的开发环境。任务提交后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。
     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // 此节点ID为创建节点时返回的ID,对应数据库File表的file_id。
            request.setFileId(501576542L);
            request.setComment("备注");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          //DeploymentId为提交或发布的返回值。
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }
    上述代码仅示例部分参数的配置,更多参数详情请参见SubmitFileGetDeployment
  4. 发布同步任务到生产环境。
    调用DeployFile接口,发布数据集成同步任务至生产环境。
    说明 仅标准模式的工作空间涉及执行该发布操作。
     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("备注");
            //NodeId和file_id二选一。NodeId的值为调度配置中基础属性的节点ID。
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             //DeploymentId为提交或发布的返回值。
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }
    上述代码仅示例部分参数的配置,更多参数详情请参见DeployFile
  5. 获取发布包详情。
    任务发布后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。当GetDeployment接口的返回参数Status取值为1时,则表示此次发布成功。
    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          //DeploymentId为提交或发布的返回值。调用GetDeployment接口,获取本次发布的具体情况。
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }
    上述代码仅示例部分参数的配置,更多参数详情请参见GetDeployment

修改同步任务的相关配置

成功创建数据集成同步任务后,您可以调用UpdateDISyncTask接口更新任务的Content,或通过TaskParam来更新使用的独享资源组。更新后,您需要重新提交、发布同步任务,详情请参见配置流程

删除同步任务

创建数据集成同步任务后,您可以调用DeleteFile接口删除同步任务。

Sample代码

POM 依赖

<dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.1</version>
 </dependency>

Java Sdk调用代码

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_first\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"业务流程/自动化测试空间_勿动/数据集成\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // 使用数据集成独享资源组,
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // 使用调度独享资源组
        request.setInputList("dataworks_di_autotest_root");

        request.setAutoParsing(true);
        request.setDependentNodeIdList("5,10,15,20");
        request.setDependentType("SELF");
        request.setStartEffectDate(0L);
        request.setEndEffectDate(4155787800000L);
        request.setFileDescription("description");
        request.setStop(false);
        request.setParaValue("x=a y=b z=c");
        request.setSchedulerType("NORMAL");
        request.setAutoRerunIntervalMillis(120000);
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }
    static void deleteTask(Long fileId) throws Exception {
        DeleteFileRequest request = new DeleteFileRequest();
        Long projectId = 63845L;
        request.setProjectId(projectId);
        request.setFileId(fileId);


        String akId = "XXX";
        String akSecret = "XXXX";
        String regionId = "cn-hangzhou";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
        IAcsClient client;

        client = new DefaultAcsClient(profile);

        DeleteFileResponse response1 = client.getAcsResponse(request);
        System.out.println(JSONObject.toJSONString(response1));
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://help.aliyun.com/document_detail/378657.html
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // 创建数据集成任务
        updateFile(fileId);   // 修改数据集成任务的调度属性
    }
}