本文介绍如何通过OpenAPI的方式创建作业、运行作业以及查看结果。

前提条件

使用场景

您已经在华东1(杭州)创建了项目,现需要在项目中编辑作业并运行。

现有集群的基本配置为:
  • 项目名称为emr_openapi_demo_project,项目ID为FP-D18E9976D5A****。如未创建项目,请创建新项目,详情请参见管理项目
  • 项目管理的集群名为emr_openapi_demo, 集群ID为C-69CB0546800F****。
说明
  • 通过CreateFlowJob创建作业时,可以在返回结果中找到项目ID。
  • 通过ListFlowProject可以查询该Region下所有数据开发项目的列表,也可以在返回结果中找到项目ID。

示例

  • Java
    1. 创建Hive_SQL类型的作业。调用CreateFlowProject创建作业时,下面几个参数是必须的。
      参数 描述
      RegionId 地域ID。例如cn-hangzhou。
      ProjectId 项目ID。
      Name 自定义作业名字。例如emr_openapi_hivejob。
      Type 作业类型,目前支持:MR、Spark、Hive_SQL、Hive、Pig、Sqoop、Spark_SQL、Spark_Streaming及Shell。
      Description 作业的描述。
      Adhoc 是否临时查询。取值为true和false。
      说明 ClusterId对于创建作业来说是可选项。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      public class CreateFlowJob {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              CreateFlowJobRequest request = new CreateFlowJobRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setName("emr_openapi_hivejob");
              request.setDescription("Hive_SQL job created by OpenAPI");
              request.setType("HIVE_SQL");
              request.setAdhoc(false);
              try {
                  CreateFlowJobResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
      执行后返回该作业的ID,即JobId。
      {
          "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0",
          "Id": "FJ-E4157D27791D****"
      }
    2. 执行作业。调用SubmitFlowJob提交运行作业,每次只允许存在一个正在运行的实例,以下的几个参数是必须要填写的。
      参数 描述
      ClusterId 执行作业的集群ID。
      JobId 作业ID,即步骤1中返回的结果FJ-E4157D27791D****。
      ProjectId 工作流项目ID。
      RegionId 地域ID。例如cn-hangzhou。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      public class SubmitFlowJob {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              SubmitFlowJobRequest request = new SubmitFlowJobRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setJobId("FJ-E4157D27791D****");
              request.setClusterId("C-69CB0546800F****");
              try {
                  SubmitFlowJobResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
      返回FlowJobInstanceID。
      {
          "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808",
          "Id": "FJI-54FEBB063136****"
      }
    3. 当作业类型为Hive_SQL或Spark_SQL时,可以调用ListFlowNodeSqlResult得到查询的结果。

      ListFlowNodeSqlResult返回的结果最多只有200行。

      需提供以下参数。
      参数 描述
      RegionId 地域ID,例如cn-hangzhou。
      ProjectId 工作流项目ID。
      NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class ListFlowNodeSqlResult {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              ListFlowNodeSqlResultRequest request = new ListFlowNodeSqlResultRequest();
              request.setRegionId("cn-hangzhou");
              request.setProjectId("FP-D18E9976D5A****");
              request.setNodeInstanceId("FJI-54FEBB063136****");
              try {
                  ListFlowNodeSqlResultResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
    4. 查看作业执行的详细信息,例如执行状态、时间和时长等。需提供以下参数。
      参数 描述
      RegionId 地域ID。例如cn-hangzhou。
      ProjectId 工作流项目ID。
      NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      public class DescribeFlowNodeInstance {
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
              DescribeFlowNodeInstanceRequest request = new DescribeFlowNodeInstanceRequest();
              request.setRegionId("cn-hangzhou");
              request.setNodeInstanceId("FJI-54FEBB063136****");
              request.setProjectId("FP-D18E9976D5A****");
              try {
                  DescribeFlowNodeInstanceResponse response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
          }
      }
  • Python
    1. 创建Hive_SQL类型的作业,在创建作业时,下面几个参数是必须的。
      参数 描述
      RegionId 地域ID。例如cn-hangzhou。
      ProjectId 项目ID。
      Name 自定义作业名字。例如emr_openapi_hivejob。
      Type 作业类型,目前支持:MR、Spark、Hive_SQL、Hive、Pig、Sqoop、Spark_SQL、Spark_Streaming及Shell。
      Description 作业的描述。
      Adhoc 是否临时查询。取值为true和false。
      说明 ClusterId对于创建作业来说是可选项。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.CreateFlowJobRequest import CreateFlowJobRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = CreateFlowJobRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_Name("emr_openapi_hivejob")
      request.set_Description("Hive_SQL job created by OpenAPI")
      request.set_Type("HIVE_SQL")
      request.set_Adhoc(False)
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      执行后返回该作业的ID,即JobId。
      {
          "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0",
          "Id": "FJ-E4157D27791D****"
      }
    2. 执行作业。调用SubmitFlowJob提交运行作业,每次只允许存在一个正在运行的实例,以下的几个参数是必须要填写的。
      参数 描述
      ClusterId 执行作业的集群。
      JobId 作业ID,即步骤1中返回的结果FJ-E4157D27791D****。
      ProjectId 工作流项目ID。
      RegionId 地域ID。例如cn-hangzhou。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.SubmitFlowJobRequest import SubmitFlowJobRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = SubmitFlowJobRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_JobId("FJ-E4157D27791D****")
      request.set_ClusterId("C-69CB0546800F****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
      返回FlowJobInstanceID。
      {
          "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808",
          "Id": "FJI-54FEBB063136****"
      }
    3. 当FlowJob的类型为Hive_SQL或者Spark_SQL时,可以调用ListFlowNodeSqlResult得到查询的结果值。

      ListFlowNodeSqlResult返回的结果最多只有200行。

      需提供以下参数。
      参数 描述
      RegionId 地域ID。例如cn-hangzhou。
      ProjectId 工作流项目ID。
      NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.ListFlowNodeSqlResultRequest import ListFlowNodeSqlResultRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = ListFlowNodeSqlResultRequest()
      request.set_accept_format('json')
      
      request.set_ProjectId("FP-D18E9976D5A****")
      request.set_NodeInstanceId("FJI-54FEBB063136****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))
    4. 查看作业执行的详细信息,例如执行状态、时间和时长等,需提供以下参数。
      参数 描述
      RegionId 地域ID。例如cn-hangzhou。
      ProjectId 工作流项目ID。
      NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.DescribeFlowNodeInstanceRequest import DescribeFlowNodeInstanceRequest
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = DescribeFlowNodeInstanceRequest()
      request.set_accept_format('json')
      
      request.set_Id("FJI-54FEBB063136****")
      request.set_ProjectId("FP-D18E9976D5A****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))

相关OpenAPI