本文介绍如何通过OpenAPI的方式创建作业、运行作业以及查看结果。
前提条件
- 已创建集群,详情请参见创建集群。
- 已获取集群ID,详情请参见查看集群列表与详情。
- 已创建AccessKey,详情请参见获取AccessKey。
- 已获取相应的SDK,Java SDK请参见SDK下载,Python SDK请参见安装SDK。
使用场景
您已经在华东1(杭州)创建了项目,现需要在项目中编辑作业并运行。
现有集群的基本配置为:
- 项目名称为emr_openapi_demo_project,项目ID为FP-D18E9976D5A****。如未创建项目,请创建新项目,详情请参见管理项目。
- 项目管理的集群名为emr_openapi_demo, 集群ID为C-69CB0546800F****。
说明
- 通过CreateFlowJob创建作业时,可以在返回结果中找到项目ID。
- 通过ListFlowProject可以查询该Region下所有数据开发项目的列表,也可以在返回结果中找到项目ID。
示例
- Java
- 创建Hive_SQL类型的作业。调用CreateFlowProject创建作业时,下面几个参数是必须的。
参数 描述 RegionId 地域ID。例如cn-hangzhou。 ProjectId 项目ID。 Name 自定义作业名字。例如emr_openapi_hivejob。 Type 作业类型,目前支持:MR、Spark、Hive_SQL、Hive、Pig、Sqoop、Spark_SQL、Spark_Streaming及Shell。 Description 作业的描述。 Adhoc 是否临时查询。取值为true和false。 说明 ClusterId对于创建作业来说是可选项。import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class CreateFlowJob { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); CreateFlowJobRequest request = new CreateFlowJobRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setName("emr_openapi_hivejob"); request.setDescription("Hive_SQL job created by OpenAPI"); request.setType("HIVE_SQL"); request.setAdhoc(false); try { CreateFlowJobResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
执行后返回该作业的ID,即JobId。{ "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0", "Id": "FJ-E4157D27791D****" }
- 执行作业。调用SubmitFlowJob提交运行作业,每次只允许存在一个正在运行的实例,以下的几个参数是必须要填写的。
参数 描述 ClusterId 执行作业的集群ID。 JobId 作业ID,即步骤1中返回的结果FJ-E4157D27791D****。 ProjectId 工作流项目ID。 RegionId 地域ID。例如cn-hangzhou。 import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class SubmitFlowJob { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); SubmitFlowJobRequest request = new SubmitFlowJobRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setJobId("FJ-E4157D27791D****"); request.setClusterId("C-69CB0546800F****"); try { SubmitFlowJobResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
返回FlowJobInstanceID。{ "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808", "Id": "FJI-54FEBB063136****" }
- 当作业类型为Hive_SQL或Spark_SQL时,可以调用ListFlowNodeSqlResult得到查询的结果。
ListFlowNodeSqlResult返回的结果最多只有200行。
需提供以下参数。参数 描述 RegionId 地域ID,例如cn-hangzhou。 ProjectId 工作流项目ID。 NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。 import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class ListFlowNodeSqlResult { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); ListFlowNodeSqlResultRequest request = new ListFlowNodeSqlResultRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setNodeInstanceId("FJI-54FEBB063136****"); try { ListFlowNodeSqlResultResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
- 查看作业执行的详细信息,例如执行状态、时间和时长等。需提供以下参数。
参数 描述 RegionId 地域ID。例如cn-hangzhou。 ProjectId 工作流项目ID。 NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。 import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class DescribeFlowNodeInstance { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); DescribeFlowNodeInstanceRequest request = new DescribeFlowNodeInstanceRequest(); request.setRegionId("cn-hangzhou"); request.setNodeInstanceId("FJI-54FEBB063136****"); request.setProjectId("FP-D18E9976D5A****"); try { DescribeFlowNodeInstanceResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
- 创建Hive_SQL类型的作业。调用CreateFlowProject创建作业时,下面几个参数是必须的。
- Python
- 创建Hive_SQL类型的作业,在创建作业时,下面几个参数是必须的。
参数 描述 RegionId 地域ID。例如cn-hangzhou。 ProjectId 项目ID。 Name 自定义作业名字。例如emr_openapi_hivejob。 Type 作业类型,目前支持:MR、Spark、Hive_SQL、Hive、Pig、Sqoop、Spark_SQL、Spark_Streaming及Shell。 Description 作业的描述。 Adhoc 是否临时查询。取值为true和false。 说明 ClusterId对于创建作业来说是可选项。#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.CreateFlowJobRequest import CreateFlowJobRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = CreateFlowJobRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_Name("emr_openapi_hivejob") request.set_Description("Hive_SQL job created by OpenAPI") request.set_Type("HIVE_SQL") request.set_Adhoc(False) response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
执行后返回该作业的ID,即JobId。{ "RequestId": "01B0A835-C6AB-4166-8E43-511514D8FAE0", "Id": "FJ-E4157D27791D****" }
- 执行作业。调用SubmitFlowJob提交运行作业,每次只允许存在一个正在运行的实例,以下的几个参数是必须要填写的。
参数 描述 ClusterId 执行作业的集群。 JobId 作业ID,即步骤1中返回的结果FJ-E4157D27791D****。 ProjectId 工作流项目ID。 RegionId 地域ID。例如cn-hangzhou。 #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.SubmitFlowJobRequest import SubmitFlowJobRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = SubmitFlowJobRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_JobId("FJ-E4157D27791D****") request.set_ClusterId("C-69CB0546800F****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
返回FlowJobInstanceID。{ "RequestId": "15BBB0DC-EEC5-4CE4-B4FA-A1D9827F8808", "Id": "FJI-54FEBB063136****" }
- 当FlowJob的类型为Hive_SQL或者Spark_SQL时,可以调用ListFlowNodeSqlResult得到查询的结果值。
ListFlowNodeSqlResult返回的结果最多只有200行。
需提供以下参数。参数 描述 RegionId 地域ID。例如cn-hangzhou。 ProjectId 工作流项目ID。 NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。 #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.ListFlowNodeSqlResultRequest import ListFlowNodeSqlResultRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = ListFlowNodeSqlResultRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_NodeInstanceId("FJI-54FEBB063136****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
- 查看作业执行的详细信息,例如执行状态、时间和时长等,需提供以下参数。
参数 描述 RegionId 地域ID。例如cn-hangzhou。 ProjectId 工作流项目ID。 NodeInstanceId 节点实例ID。例如,在步骤2中返回的实例ID:FJI-54FEBB063136****。 #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.DescribeFlowNodeInstanceRequest import DescribeFlowNodeInstanceRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = DescribeFlowNodeInstanceRequest() request.set_accept_format('json') request.set_Id("FJI-54FEBB063136****") request.set_ProjectId("FP-D18E9976D5A****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
- 创建Hive_SQL类型的作业,在创建作业时,下面几个参数是必须的。