全部产品
E-MapReduce

快速开始

更新时间:2017-06-07 13:26:11   分享:   

环境准备

创建一个 Maven工程,添加 Maven 依赖,如下所示:

  1. <dependency>
  2. <groupId>com.aliyun</groupId>
  3. <artifactId>aliyun-java-sdk-core</artifactId>
  4. <version>2.3.9</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun</groupId>
  8. <artifactId>aliyun-java-sdk-emr</artifactId>
  9. <version>2.2.2</version>
  10. </dependency>

或者直接下载对应的 JAR 文件到本地。以 Eclipse 为例,其操作步骤如下:

  1. 下载 JAR 文件。

    aliyun-java-sdk-core-2.3.9.jar

    aliyun-java-sdk-emr-2.2.2.jar

  2. 将下载后的文件拷贝到您的项目中。

  3. 在 Eclipse 中选择右击您的工程名称,然后单击 Properties -> Java Build Path -> Add JARs。

  4. 选中您在步骤 2 中拷贝的所有 JAR 文件。

经过以上几步,您就可以在 Eclipse 项目中使用阿里云 E-MapReduce Open API Java SDK。

初始化 Client

  1. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
  2. DefaultAcsClient client = new DefaultAcsClient(profile);

SDK 中所用对 E-MapReduce 的操作都可以使用这个 client 来进行。

示例代码

集群

  • 创建集群

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final CreateClusterRequest request = new CreateClusterRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setName("Your-Cluster-Name");
    7. // if you did not specify security group id, it will create a new security group with given name
    8. request.setSecurityGroupId("Your-Security-Group-Id"); // (1)
    9. request.setAutoRenew(false);
    10. request.setChargeType("PostPaid"); // PostPaid or PrePaid
    11. request.setClusterType("HADOOP"); // HADOOP or HBase (2)
    12. request.setEmrVer("EMR-1.3.0"); // emr image version
    13. request.setIsOpenPublicIp(true);
    14. request.setLogEnable(true);
    15. request.setLogPath("oss://Your-Bucket/Your-Folder");
    16. request.setMasterPwdEnable(true); // enable master password
    17. request.setMasterPwd("Aa123456789"); // set master node's password
    18. request.setZoneId("cn-hangzhou-b"); // set zone
    19. // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),
    20. // 详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置
    21. // https://ecs.console.aliyun.com/#/create/postpay/
    22. request.setIoOptimized(true); // 设置IO优化参数
    23. request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列, ecs-1/ecs-2
    24. request.setNetType("classic"); // 设置网络类型 classic/vpc
    25. List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
    26. CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
    27. masterOrder.setIndex(1);
    28. masterOrder.setDiskCapacity(50);
    29. masterOrder.setDiskCount(2);
    30. masterOrder.setDiskType("CLOUD_EFFICIENCY"); // specify disk type (2)
    31. masterOrder.setInstanceType("ecs.n1.large"); // specify ecs instance type
    32. masterOrder.setNodeCount(1);
    33. masterOrder.setNodeType("MASTER"); // MASTER or CORE (2)
    34. ecsOrders.add(masterOrder);
    35. CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
    36. coreOrder.setIndex(2);
    37. coreOrder.setDiskCapacity(50);
    38. coreOrder.setDiskCount(4);
    39. coreOrder.setDiskType("CLOUD_EFFICIENCY");
    40. coreOrder.setInstanceType("ecs.n1.large");
    41. coreOrder.setNodeCount(3);
    42. coreOrder.setNodeType("CORE");
    43. ecsOrders.add(coreOrder);
    44. request.setEcsOrders(ecsOrders);
    45. try {
    46. CreateClusterResponse response = client.getAcsResponse(request);
    47. String clusterId = response.getClusterId(); // cluster id
    48. // TODO do something with this cluster
    49. } catch (Exception e) {
    50. // TODO do something
    51. }
    52. }
    1. 创建集群需要指定集群属于哪个安全组。如果不指定安全组 ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。

    2. 具体枚举取值,请参考:https://help.aliyun.com/document_detail/emr/OpenAPI/EnumTypes.html

    3. 上述示例代码中,是创建了一个经典网络的集群,如果需要创建 VPC 网络的集群,则需要将 request 中的网络类型设置为 vpc,并且指定 vpcid 和 vswitchid,如下所示:

      1. request.setNetType("vpc"); // 设置网络类型 classic/vpc
      2. request.setVpcId("your-vpcId");
      3. request.setVSwitchId("your-switchId");
    4. 设置高可用参数,关于高可用参数的说明请参考创建集群硬件配置部分

      1. request.setHighAvailabilityEnable(true);
    5. 设置可选软件组件,关于可选软件组件的说明请参考创建集群软件配置章节

      1. List<String> soft = new ArrayList<String>();
      2. soft.add("presto");
      3. soft.add("oozie");
      4. request.setOptionSoftWareLists(soft);
    6. 设置配置项,请参考这里

      1. request.setConfigurations("oss://your-bucket/your-conf.json");
    7. 设置引导操作,请参考这里

      1. List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
      2. CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
      3. bootstrapActionList.setName("bootstrapName");
      4. bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
      5. bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
      6. bootstrapActionLists.add(bootstrapActionList);
      7. request.setBootstrapActions(bootstrapActionLists);
  • 查询集群详情

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final DescribeClusterRequest request = new DescribeClusterRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setId("C-XXXXXXXXXXXXXXXX"); // cluster id
    7. try {
    8. DescribeClusterResponse response = client.getAcsResponse(request);
    9. DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
    10. // TODO do something with this cluster
    11. } catch (Exception e) {
    12. // TODO do something
    13. }
    14. }
  • 查询集群列表

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final ListClustersRequest request = new ListClustersRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setPageNumber(1);
    7. request.setIsDesc(true);
    8. request.setPageSize(20);
    9. try {
    10. ListClustersResponse response = client.getAcsResponse(request);
    11. List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
    12. for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
    13. // TODO do something with this cluster
    14. }
    15. } catch (Exception e) {
    16. // TODO do something
    17. }
    18. }
  • 释放集群

  1. public static void main(String[] args) {
  2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
  3. DefaultAcsClient client = new DefaultAcsClient(profile);
  4. ReleaseClusterRequest request = new ReleaseClusterRequest();
  5. request.setId("C-XXXXXXXXXXXXXXXX"); // specify the cluster id you want to release.
  6. try {
  7. ReleaseClusterResponse response = client.getAcsResponse(request);
  8. } catch (Exception e) {
  9. // TODO do something
  10. }
  11. }

作业

  • 创建作业

    ```

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final CreateJobRequest request = new CreateJobRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setName("Your-Job-Name");
    7. request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
    8. request.setFailAct("CONTINUE"); // STOP or CONTINUE
    9. request.setType("SPARK"); // SPARK or HADOOP or HIVE or PIG
  1. try {
  2. CreateJobResponse response = client.getAcsResponse(request);
  3. String jobId = response.getId();
  4. // TODO do something with this job
  5. } catch (Exception e) {
  6. // TODO do something
  7. }
  8. }
  9. ```
  • 删除作业

    注意:如果一个作业被其他执行计划使用,则不能删除,需要先删除对应的执行计划或者修改对应的执行计划。

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final DeleteJobRequest request = new DeleteJobRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setId("J-XXXXXXXXXXXXXXXX"); // set job id
    7. try {
    8. DeleteJobResponse response = client.getAcsResponse(request);
    9. } catch (Exception e) {
    10. // TODO do something
    11. }
    12. }

执行计划

  • 创建执行计划

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final CreateExecutionPlanRequest request = new CreateExecutionPlanRequest();
    5. request.setRegionId("cn-hangzhou"); // set region id
    6. request.setName("Your-ExecutionPlan-Name");
    7. request.setCreateClusterOnDemand(false);
    8. request.setStrategy("RUN_MANUALLY"); // RUN_MANUALLY or SCHEDULE
    9. request.setClusterId("C-XXXXXXXXXXXXXXXX"); // specify an existing running cluster
    10. List<String> jobIds = new ArrayList<String>();
    11. jobIds.add("J-XXXXXXXXXXXXXXXX"); // specify a job
    12. request.setJobIdLists(jobIds);
    13. try {
    14. CreateExecutionPlanResponse response = client.getAcsResponse(request);
    15. String executionPlanId = response.getId();
    16. // TODO do something with this execution plan
    17. } catch (Exception e) {
    18. // TODO do something
    19. }
    20. }

    上述实例代码创建了一个手动执行(非周期调度的)执行计划,并且该执行计划关联了一个已经创建好的集群。

    如果需要创建一个周期调度的执行计划,则需要修改或者增加如下代码:

    1. request.setStrategy("SCHEDULE"); // RUN_MANUALLY or SCHEDULE
    2. request.setStartTime(new Date().getTime()); // set start time
    3. request.setTimeUnit("DAY"); // DAY or HOUR
    4. request.setTimeInterval(1); // set time interval

    如果需要创建一个按需创建集群的执行计划,则需要修改和增加如下代码:

    1. request.setCreateClusterOnDemand(true);
    2. request.setClusterType("HADOOP");
    3. request.setClusterName("Your-Cluster-Name");
    4. request.setEmrVer("EMR-1.3.0");
    5. request.setSecurityGroupId("Your-Security-Group-Id");
    6. request.setIsOpenPublicIp(true);
    7. // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),
    8. // 详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置
    9. // https://ecs.console.aliyun.com/#/create/postpay/
    10. request.setIoOptimized(true); // 设置IO优化参数
    11. request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列, ecs-1/ecs-2
    12. request.setNetType("classic"); // 设置网络类型 classic/vpc
    13. request.setLogEnable(true);
    14. request.setLogPath("oss://xxx");
    15. request.setEcsOrders(); // TODO 参考创建集群的参数设置方式。注意,这里的 ecsOder 的类型为 CreateExecutionPlanRequest.EcsOrder,与创建集群的 CreateClusterRequest.EcsOrder 不同。

    通过上面的参数来指定一个集群的配置,具体的参数设置可以参考创建集群的逻辑。按需创建集群的执行计划,会在每次执行计划启动的时候按照设定的集群配置去新建一个临时集群来运行执行计划,执行计划完成之后该集群会自动释放。与创建集群逻辑稍有不同的是,这里创建的集群必须指定一个安全组 ID(即不能指定安全组名称来新建一个安全组)。

    当然,周期调度和按需创建集群并不矛盾,即可以创建一个执行计划,它是周期调度的并且是按需创建集群的。

  • 删除执行计划

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. final DeleteExecutionPlanRequest request = new DeleteExecutionPlanRequest();
    5. request.setId("WF-XXXXXXXXXXXXXXXX"); // set execution plan id
    6. try {
    7. DeleteExecutionPlanResponse response = client.getAcsResponse(request);
    8. } catch (Exception e) {
    9. // TODO do something
    10. }
    11. }
  • 运行执行计划

    注意:处于调度中或者正在运行的执行计划不能运行。

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. RunExecutionPlanRequest request = new RunExecutionPlanRequest();
    5. request.setRegionId("cn-hangzhou");
    6. request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id which to run
    7. try {
    8. RunExecutionPlanResponse response = client.getAcsResponse(request);
    9. String instanceId = response.getExecutionPlanInstanceId();
    10. // TODO do something with this instance
    11. } catch (Exception e) {
    12. // TODO do something
    13. }
    14. }
  • 暂停执行计划调度

    对于周期性的执行计划,如果正处于周期调度中(如下图所示),可以通过暂停执行计划的 SDK 将调度中的周期执行计划暂停。

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. SuspendExecutionPlanSchedulerRequest request = new SuspendExecutionPlanSchedulerRequest();
    5. request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
    6. request.setRegionId("cn-hangzhou"); // specify the region of this execution plan
    7. try {
    8. SuspendExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
    9. } catch (Exception e) {
    10. // TODO do something
    11. }
    12. }
  • 启动执行计划调度

    对于周期性的执行计划,如果正处于暂停调度状态中(如下图所示),可以通过启动执行计划调度的 SDK 将暂停中的周期执行计划启动调度。

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. ResumeExecutionPlanSchedulerRequest request = new ResumeExecutionPlanSchedulerRequest();
    5. request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
    6. request.setRegionId("cn-hangzhou"); // specify the region of this execution plan
    7. try {
    8. ResumeExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
    9. } catch (Exception e) {
    10. // TODO do something
    11. }
    12. }
  • 查询执行计划实例列表

    执行计划实例列表,即一个执行计划运行的历史纪录列表。

    1. public static void main(String[] args) {
    2. IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
    3. DefaultAcsClient client = new DefaultAcsClient(profile);
    4. ListExecutionPlanInstancesRequest request = new ListExecutionPlanInstancesRequest();
    5. request.setRegionId("cn-hangzhou");
    6. // specify execution plan ids
    7. List<String> executionPlanIds = new ArrayList<String>();
    8. executionPlanIds.add("WF-XXXXXXXXXXXXXXX1");
    9. executionPlanIds.add("WF-XXXXXXXXXXXXXXX2");
    10. executionPlanIds.add("WF-XXXXXXXXXXXXXXX3");
    11. request.setExecutionPlanIdLists(executionPlanIds); // (1)
    12. // specify order key (ordered by id)
    13. request.setIsDesc(true);
    14. // specify page number and page size, default page number is 1 and default page size is 10.
    15. request.setPageSize(20);
    16. request.setPageNumber(1);
    17. // specify if you want to list latest instance for each execution plan id.
    18. request.setOnlyLastInstance(true); // (2) default is false
    19. try {
    20. ListExecutionPlanInstancesResponse response = client.getAcsResponse(request);
    21. for (ListExecutionPlanInstancesResponse.ExecutionPlanInstance instance : response.getExecutionPlanInstances()) {
    22. // TODO do something with each instance
    23. }
    24. } catch (Exception e) {
    25. // TODO do something
    26. }
    27. }
    1. 查询执行计划的执行历史纪录,可以指定多个执行计划 ID。

    2. 如果指定了只查上一次执行纪录,则会返回指定执行计划的上一次执行纪录,不会返回所有执行纪录数据。通常用于判断某个或者某些执行计划上次执行是否执行完成或者查询上次执行的执行状态。

本文导读目录
本文导读目录
以上内容是否对您有帮助?