本文介绍如何通过OpenAPI的方式创建工作流、运行作业,并查看执行结果和日志。
前提条件
- 已创建集群,详情请参见创建集群。
- 已获取集群ID,详情请参见查看集群列表与详情。
- 已创建AccessKey,详情请参见获取AccessKey。
- 已获取相应的SDK,Java SDK请参见SDK下载,Python SDK请参见安装SDK。
使用场景
您已经在华东1(杭州)创建了项目,并在项目中创建了三个有依赖关系的HiveSQL类型的作业,现在需要创建工作流,使作业按照依赖依次执行。
现有项目和作业信息如下:
- 项目名称为emr_openapi_demo_project,项目ID:FP-D18E9976D5A****。
- HiveSQL作业:
- customer_logs作业ID:FJ-79B3F70E23D6****。
- log_aggregation作业ID:FJ-CECB36039155****。
- customer_pic作业ID: FJ-99CFAE652650****。
示例
- Java
- 在项目emr_openapi_demo_project中创建名为new_flow的工作流,可以自定义工作流的图形。
参数 描述 CreateCluster
是否通过集群模板创建集群: - true:通过集群模板创建集群,ClusterId应设置为集群模板ID。
- false:运行在已有集群上,ClusterId应设置为已有集群ID。
graph
图形信息。 Graph参数内容如下。{ "nodes":[ { "shape":"startControlNode", "type":"node", "size":"80*34", "x":500.0, "y":250.0, "id":"48d474ea", "index":0.0, "attribute":{"type":"START"} }, { "shape":"hiveSQLJobNode",\ "type":"node", "size":"170*34", "x":498.0, "y":324.5, "id":"cd5eb72d", "index":1.0, "label":"customer_logs", "attribute":{ "type":"JOB", "jobId":"FJ-79B3F70E23D6****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":497.0, "y":416.5, "id":"b308995d", "index":2.0, "label":"log_aggregation;", "attribute":{ "type":"JOB", "jobId":"FJ-CECB36039155****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":501.0, "y":516.5, "id":"35c8d9c5", "index":3.0, "label":"customer_pic", "attribute":{ "type":"JOB", "jobId":"FJ-99CFAE652650****", "jobType":"HIVE_SQL" }, }, { "shape":"endControlNode", "type":"node", "size":"80*34", "x":503.0, "y":612.5, "id":"65f9c9a4", "index":7.0, "attribute":{"type":"END"} } ], "edges": [ { "source":"48d474ea", "sourceAnchor":0.0, "target":"cd5eb72d", "targetAnchor":0.0, "id":"3820959f", "index":4.0 }, { "source":"cd5eb72d", "sourceAnchor":1.0, "target":"b308995d", "targetAnchor":0.0, "id":"248f9dd5", "index":5.0 }, { "source":"b308995d", "sourceAnchor":1.0, "target":"35c8d9c5", "targetAnchor":0.0, "id":"dd21ddbf", "index":6.0 }, { "source":"35c8d9c5", "sourceAnchor":1.0, "target":"65f9c9a4", "targetAnchor":0.0, "id":"7ab0cd5e", "index":8.0 } ] }
说明- Graph内容分为
node
和edges
两部分,id可以自行设置,但需保证唯一性。x
、y
和size
为图形位置和大小,工作流的开头和结尾必须添加start和end节点。 - 您可以在E-MapReduce控制台上通过拖拽的方式创建工作流,详情请参见工作流编辑。通过DescribeFlow查询工作流信息,其返回参数中Graph的值即为该图形信息。
创建工作流。import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class CreateFlowForWeb { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); CreateFlowForWebRequest request = new CreateFlowForWebRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setName("new_flow1"); request.setDescription("create flow by openAPI"); request.setCreateCluster(false); request.setClusterId("C-B503DDB15B34****"); request.setGraph("{\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}"); try { CreateFlowForWebResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
返回工作流ID。{ "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF", "Id": "F-AC0915556C24****" }
- 提交运行工作流。
import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class SubmitFlow { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); SubmitFlowRequest request = new SubmitFlowRequest(); request.setRegionId("cn-hangzhou"); request.setProjectId("FP-D18E9976D5A****"); request.setFlowId("F-AC0915556C24****"); try { SubmitFlowResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
返回工作流实例ID。{ "Data": "FI-521925BE9816****", "InstanceId": "FI-521925BE9816****", "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C", "Id": "FI-521925BE9816****" }
- 查询工作流实例信息,可以看到当前工作流实例以及节点的运行状态。
import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.google.gson.Gson; import java.util.*; import com.aliyuncs.emr.model.v20160408.*; public class DescribeFlowInstance { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>"); IAcsClient client = new DefaultAcsClient(profile); DescribeFlowInstanceRequest request = new DescribeFlowInstanceRequest(); request.setRegionId("cn-hangzhou"); request.setId("FI-521925BE9816****"); request.setProjectId("FP-D18E9976D5A****"); try { DescribeFlowInstanceResponse response = client.getAcsResponse(request); System.out.println(new Gson().toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } }
- 根据获得的节点实例状态信息,您可以:
- 通过DescribeFlowNodeInstance查询节点实例详情。
- 通过DescribeFlowNodeInstanceLauncherLog查询节点实例启动器日志。
- 通过ListFlowNodeInstanceContainerStatus查看节点实例的容器状态详情。
- 在项目emr_openapi_demo_project中创建名为new_flow的工作流,可以自定义工作流的图形。
- Python
- 在项目emr_openapi_demo_project中创建名为new_flow的工作流,可以自定义工作流的图形。
参数 描述 CreateCluster
是否通过集群模板创建集群。 - true:通过集群模板创建集群,ClusterId应设置为集群模板ID(C-xxx)。
- false:运行在已有集群上,ClusterId应设置为已有集群ID(C-xxx)。
graph
图形信息。 Graph参数内容如下。{ "nodes":[ { "shape":"startControlNode", "type":"node", "size":"80*34", "x":500.0, "y":250.0, "id":"48d474ea", "index":0.0, "attribute":{"type":"START"} }, { "shape":"hiveSQLJobNode",\ "type":"node", "size":"170*34", "x":498.0, "y":324.5, "id":"cd5eb72d", "index":1.0, "label":"customer_logs", "attribute":{ "type":"JOB", "jobId":"FJ-79B3F70E23D6****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":497.0, "y":416.5, "id":"b308995d", "index":2.0, "label":"log_aggregation;", "attribute":{ "type":"JOB", "jobId":"FJ-CECB36039155****", "jobType":"HIVE_SQL" } }, { "shape":"hiveSQLJobNode", "type":"node", "size":"170*34", "trackerPath":"ec5a56bc4a261c22", "x":501.0, "y":516.5, "id":"35c8d9c5", "index":3.0, "label":"customer_pic", "attribute":{ "type":"JOB", "jobId":"FJ-99CFAE652650****", "jobType":"HIVE_SQL" }, }, { "shape":"endControlNode", "type":"node", "size":"80*34", "x":503.0, "y":612.5, "id":"65f9c9a4", "index":7.0, "attribute":{"type":"END"} } ], "edges": [ { "source":"48d474ea", "sourceAnchor":0.0, "target":"cd5eb72d", "targetAnchor":0.0, "id":"3820959f", "index":4.0 }, { "source":"cd5eb72d", "sourceAnchor":1.0, "target":"b308995d", "targetAnchor":0.0, "id":"248f9dd5", "index":5.0 }, { "source":"b308995d", "sourceAnchor":1.0, "target":"35c8d9c5", "targetAnchor":0.0, "id":"dd21ddbf", "index":6.0 }, { "source":"35c8d9c5", "sourceAnchor":1.0, "target":"65f9c9a4", "targetAnchor":0.0, "id":"7ab0cd5e", "index":8.0 } ] }
说明- graph内容分为
node
和edges
两部分,id可以自行设置,但需保证唯一性,x
、y
和size
为图形位置和大小,注意工作流的开头和结尾必须添加start和end节点。 - 您可以在E-MapReduce控制台上通过拖拽的方式创建工作流,详情请参见工作流编辑。然后通过DescribeFlow查询工作流信息,其返回参数中Graph的值即为该图形信息。
创建工作流。#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.CreateFlowForWebRequest import CreateFlowForWebRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = CreateFlowForWebRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_Name("new_flow1") request.set_Description("create flow by openAPI") request.set_CreateCluster(False) request.set_ClusterId("C-B503DDB15B34****") request.set_Graph(" {\"nodes\":[{\"shape\":\"startControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"spmAnchorId\":\"0.0.0.i0.766645eb2cmNtQ\",\"x\":500.0,\"y\":250.0,\"id\":\"48d474ea\",\"index\":0.0,\"attribute\":{\"type\":\"START\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":498.0,\"y\":324.5,\"id\":\"cd5eb72d\",\"index\":1.0,\"label\":\"customer_logs\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-79B3F70E23D6****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":497.0,\"y\":416.5,\"id\":\"b308995d\",\"index\":2.0,\"label\":\"log_aggregation;\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-CECB36039155****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"hiveSQLJobNode\",\"type\":\"node\",\"size\":\"170*34\",\"trackerPath\":\"ec5a56bc4a261c22\",\"x\":501.0,\"y\":516.5,\"id\":\"35c8d9c5\",\"index\":3.0,\"label\":\"customer_pic\",\"attribute\":{\"type\":\"JOB\",\"jobId\":\"FJ-99CFAE652650****\",\"jobType\":\"HIVE_SQL\"},\"config\":{\"clusterId\":\"\",\"hostName\":\"\"}},{\"shape\":\"endControlNode\",\"type\":\"node\",\"size\":\"80*34\",\"trackerPath\":\"bb4b171fe9d130b4\",\"x\":503.0,\"y\":612.5,\"id\":\"65f9c9a4\",\"index\":7.0,\"attribute\":{\"type\":\"END\"}}],\"edges\":[{\"source\":\"48d474ea\",\"sourceAnchor\":0.0,\"target\":\"cd5eb72d\",\"targetAnchor\":0.0,\"id\":\"3820959f\",\"index\":4.0},{\"source\":\"cd5eb72d\",\"sourceAnchor\":1.0,\"target\":\"b308995d\",\"targetAnchor\":0.0,\"id\":\"248f9dd5\",\"index\":5.0},{\"source\":\"b308995d\",\"sourceAnchor\":1.0,\"target\":\"35c8d9c5\",\"targetAnchor\":0.0,\"id\":\"dd21ddbf\",\"index\":6.0},{\"source\":\"35c8d9c5\",\"sourceAnchor\":1.0,\"target\":\"65f9c9a4\",\"targetAnchor\":0.0,\"id\":\"7ab0cd5e\",\"index\":8.0}]}"); response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
返回工作流ID。{ "RequestId": "417BA643-0E5F-456E-8222-B3EB5DD800DF", "Id": "F-AC0915556C24****" }
- 提交运行工作流。
#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.SubmitFlowRequest import SubmitFlowRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = SubmitFlowRequest() request.set_accept_format('json') request.set_ProjectId("FP-D18E9976D5A****") request.set_FlowId("F-AC0915556C24****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
返回工作流实例ID。{ "Data": "FI-521925BE9816****", "InstanceId": "FI-521925BE9816****", "RequestId": "DBD1D138-C7F0-4D81-B3F2-0B4512E4A74C", "Id": "FI-521925BE9816****" }
- 查询工作流实例信息,可以看到当前工作流实例以及节点实例的运行状态。
#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkemr.request.v20160408.DescribeFlowInstanceRequest import DescribeFlowInstanceRequest client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou') request = DescribeFlowInstanceRequest() request.set_accept_format('json') request.set_Id("FI-521925BE9816****") request.set_ProjectId("FP-D18E9976D5A****") response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
- 根据获得的节点实例状态信息,您可以:
- 通过DescribeFlowNodeInstance查询节点实例详情。
- 通过DescribeFlowNodeInstanceLauncherLog查询节点实例启动器日志。
- 通过ListFlowNodeInstanceContainerStatus查看节点实例的容器状态详情。
- 在项目emr_openapi_demo_project中创建名为new_flow的工作流,可以自定义工作流的图形。