本文介绍如何通过OpenAPI的方式创建工作流、运行作业,并查看执行结果和日志。

前提条件

使用场景

您已经在华东1(杭州)创建了项目,并在项目中创建了三个有依赖关系的HiveSQL类型的作业,现在需要创建工作流,使作业按照依赖依次执行。

现有项目和作业信息如下:
  • 项目名称为emr_openapi_demo_project,项目ID:FP-D18E9976D5A****。
  • HiveSQL作业:
    • customer_logs作业ID:FJ-79B3F70E23D6****。
    • log_aggregation作业ID:FJ-CECB36039155****。
    • customer_pic作业ID: FJ-99CFAE652650****。

示例

  • Java
    1. 在项目emr_openapi_demo_project中创建名为new_flow的工作流,可以自定义工作流的图形。工作流
      参数 描述
      CreateCluster 是否通过集群模板创建集群:
      • true:通过集群模板创建集群,ClusterId应设置为集群模板ID。
      • false:运行在已有集群上,ClusterId应设置为已有集群ID。
      graph 图形信息。
      Graph参数内容如下。
      {
          "nodes":[
              {
                  "shape":"startControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":500.0,
                  "y":250.0,
                  "id":"48d474ea",
                  "index":0.0,
                  "attribute":{"type":"START"}
              },
              {
                  "shape":"hiveSQLJobNode",\
                  "type":"node",
                  "size":"170*34",
                  "x":498.0,
                  "y":324.5,
                  "id":"cd5eb72d",
                  "index":1.0,
                  "label":"customer_logs",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-79B3F70E23D6****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":497.0,
                  "y":416.5,
                  "id":"b308995d",
                  "index":2.0,
                  "label":"log_aggregation;",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-CECB36039155****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":501.0,
                  "y":516.5,
                  "id":"35c8d9c5",
                  "index":3.0,
                  "label":"customer_pic",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-99CFAE652650****",
                      "jobType":"HIVE_SQL"
                      },
              },
              {
                  "shape":"endControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":503.0,
                  "y":612.5,
                  "id":"65f9c9a4",
                  "index":7.0,
                  "attribute":{"type":"END"}
              }
          ],
          "edges":
          [
              {
                  "source":"48d474ea",
                  "sourceAnchor":0.0,
                  "target":"cd5eb72d",
                  "targetAnchor":0.0,
                  "id":"3820959f",
                  "index":4.0
              },
              {
                  "source":"cd5eb72d",
                  "sourceAnchor":1.0,
                  "target":"b308995d",
                  "targetAnchor":0.0,
                  "id":"248f9dd5",
                  "index":5.0
              },
              {
                  "source":"b308995d",
                  "sourceAnchor":1.0,
                  "target":"35c8d9c5",
                  "targetAnchor":0.0,
                  "id":"dd21ddbf",
                  "index":6.0
              },
              {
                  "source":"35c8d9c5",
                  "sourceAnchor":1.0,
                  "target":"65f9c9a4",
                  "targetAnchor":0.0,
                  "id":"7ab0cd5e",
                  "index":8.0
              }
              ]
      }
      说明
      • Graph内容分为nodeedges两部分,id可以自行设置,但需保证唯一性。xysize为图形位置和大小,工作流的开头和结尾必须添加start和end节点。
      • 您可以在E-MapReduce控制台上通过拖拽的方式创建工作流,详情请参见工作流编辑。通过DescribeFlow查询工作流信息,其返回参数中Graph的值即为该图形信息。
      创建工作流。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class CreateFlowForWeb {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              CreateFlowForWebRequest request = new CreateFlowForWebRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setName("new_flow1");
              request.setDescription("create flow by openAPI");
              request.setCreateCluster(false);
              request.setClusterId("C-B503DDB15B34****");
              request.setGraph("{\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}");
      
              try {
                  CreateFlowForWebResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
      返回工作流ID。
      {
          "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF",
          "Id": "F-AC0915556C24****"
      }
    2. 提交运行工作流。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class SubmitFlow {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              SubmitFlowRequest request = new SubmitFlowRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setFlowId("F-AC0915556C24****");
      
              try {
                  SubmitFlowResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
      返回工作流实例ID。
      {
          "Data": "FI-521925BE9816****",
          "InstanceId": "FI-521925BE9816****",
          "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C",
          "Id": "FI-521925BE9816****"
      }
    3. 查询工作流实例信息,可以看到当前工作流实例以及节点的运行状态。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class DescribeFlowInstance {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              DescribeFlowInstanceRequest request = new DescribeFlowInstanceRequest();
              request.setRegionId("cn-hangzhou");
              request.setId("FI-521925BE9816****");
              request.setProjectId("FP-D18E9976D5A****");
      
              try {
                  DescribeFlowInstanceResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
    4. 根据获得的节点实例状态信息,您可以:
      • 通过DescribeFlowNodeInstance查询节点实例详情。
      • 通过DescribeFlowNodeInstanceLauncherLog查询节点实例启动器日志。
      • 通过ListFlowNodeInstanceContainerStatus查看节点实例的容器状态详情。
  • Python
    1. 在项目emr_openapi_demo_project中创建名为new_flow的工作流,可以自定义工作流的图形。工作流
      参数 描述
      CreateCluster 是否通过集群模板创建集群。
      • true:通过集群模板创建集群,ClusterId应设置为集群模板ID(C-xxx)。
      • false:运行在已有集群上,ClusterId应设置为已有集群ID(C-xxx)。
      graph 图形信息。
      Graph参数内容如下。
      {
          "nodes":[
              {
                  "shape":"startControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":500.0,
                  "y":250.0,
                  "id":"48d474ea",
                  "index":0.0,
                  "attribute":{"type":"START"}
              },
              {
                  "shape":"hiveSQLJobNode",\
                  "type":"node",
                  "size":"170*34",
                  "x":498.0,
                  "y":324.5,
                  "id":"cd5eb72d",
                  "index":1.0,
                  "label":"customer_logs",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-79B3F70E23D6****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":497.0,
                  "y":416.5,
                  "id":"b308995d",
                  "index":2.0,
                  "label":"log_aggregation;",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-CECB36039155****",
                      "jobType":"HIVE_SQL"
                      }
              },
              {
                  "shape":"hiveSQLJobNode",
                  "type":"node",
                  "size":"170*34",
                  "trackerPath":"ec5a56bc4a261c22",
                  "x":501.0,
                  "y":516.5,
                  "id":"35c8d9c5",
                  "index":3.0,
                  "label":"customer_pic",
                  "attribute":{
                      "type":"JOB",
                      "jobId":"FJ-99CFAE652650****",
                      "jobType":"HIVE_SQL"
                      },
              },
              {
                  "shape":"endControlNode",
                  "type":"node",
                  "size":"80*34",
                  "x":503.0,
                  "y":612.5,
                  "id":"65f9c9a4",
                  "index":7.0,
                  "attribute":{"type":"END"}
              }
          ],
          "edges":
          [
              {
                  "source":"48d474ea",
                  "sourceAnchor":0.0,
                  "target":"cd5eb72d",
                  "targetAnchor":0.0,
                  "id":"3820959f",
                  "index":4.0
              },
              {
                  "source":"cd5eb72d",
                  "sourceAnchor":1.0,
                  "target":"b308995d",
                  "targetAnchor":0.0,
                  "id":"248f9dd5",
                  "index":5.0
              },
              {
                  "source":"b308995d",
                  "sourceAnchor":1.0,
                  "target":"35c8d9c5",
                  "targetAnchor":0.0,
                  "id":"dd21ddbf",
                  "index":6.0
              },
              {
                  "source":"35c8d9c5",
                  "sourceAnchor":1.0,
                  "target":"65f9c9a4",
                  "targetAnchor":0.0,
                  "id":"7ab0cd5e",
                  "index":8.0
              }
              ]
      }
      说明
      • graph内容分为nodeedges两部分,id可以自行设置,但需保证唯一性,xysize为图形位置和大小,注意工作流的开头和结尾必须添加start和end节点。
      • 您可以在E-MapReduce控制台上通过拖拽的方式创建工作流,详情请参见工作流编辑。然后通过DescribeFlow查询工作流信息,其返回参数中Graph的值即为该图形信息。
      创建工作流。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.CreateFlowForWebRequest import CreateFlowForWebRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = CreateFlowForWebRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_Name("new_flow1")
      request.set_Description("create flow by openAPI")
      request.set_CreateCluster(False)
      request.set_ClusterId("C-B503DDB15B34****")
      request.set_Graph(" {\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}");
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      返回工作流ID。
      {
          "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF",
          "Id": "F-AC0915556C24****"
      }
    2. 提交运行工作流。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.SubmitFlowRequest import SubmitFlowRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = SubmitFlowRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_FlowId("F-AC0915556C24****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      返回工作流实例ID。
      {
          "Data": "FI-521925BE9816****",
          "InstanceId": "FI-521925BE9816****",
          "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C",
          "Id": "FI-521925BE9816****"
      }
    3. 查询工作流实例信息,可以看到当前工作流实例以及节点实例的运行状态。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.DescribeFlowInstanceRequest import DescribeFlowInstanceRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = DescribeFlowInstanceRequest()
      request.set_accept_format('json')
      
      request.set_Id("FI-521925BE9816****")
      request.set_ProjectId("FP-D18E9976D5A****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
    4. 根据获得的节点实例状态信息,您可以:
      • 通过DescribeFlowNodeInstance查询节点实例详情。
      • 通过DescribeFlowNodeInstanceLauncherLog查询节点实例启动器日志。
      • 通过ListFlowNodeInstanceContainerStatus查看节点实例的容器状态详情。

相关OpenAPI