本章节为您介绍使用Java SDK快速创建集群、作业、执行计划。

说明

OpenApi Explorer提供在线调用云产品API、动态生成SDK示例代码和快速检索接口等功能,能显著降低使用API的难度,推荐您使用。

环境准备

创建一个 Maven工程,添加 Maven 依赖,如下所示:
<dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>2.3.9</version>
       </dependency>
       <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-emr</artifactId>
            <version>2.2.2</version>
        </dependency>

或者直接下载对应的 JAR 文件到本地。以 Eclipse 为例,其操作步骤如下:

  1. 下载 JAR 文件。

    aliyun-java-sdk-core-2.3.9.jar

    aliyun-java-sdk-emr-2.2.2.jar

  2. 将下载后的文件拷贝到您的项目中。
  3. 在 Eclipse 中选择右击您的工程名称,然后单击 Properties > Java Build Path > Add JARs
  4. 选中您在步骤 2 中拷贝的所有 JAR 文件。

经过以上几步,您就可以在 Eclipse 项目中使用阿里云 E-MapReduce Open API Java SDK。

初始化 Client

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    DefaultAcsClient client = new DefaultAcsClient(profile);

SDK 中所有对 E-MapReduce 的操作都可以使用这个 client 来进行。

示例代码

  • 集群
    • 创建集群
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateClusterRequest request = new CreateClusterRequest();
            request.setName("Your-Cluster-Name");
            // if you did not specify security group id, it will create a new security group with given name
            request.setSecurityGroupId("Your-Security-Group-Id"); // (1)
            request.setAutoRenew(false);
            request.setChargeType("PostPaid"); // PostPaid or PrePaid
            request.setClusterType("HADOOP"); // HADOOP or HBase (2)
            request.setEmrVer("EMR-1.3.0"); // emr image version
            request.setIsOpenPublicIp(true);
            request.setLogEnable(true);
            request.setLogPath("oss://Your-Bucket/Your-Folder");
            request.setMasterPwdEnable(true); // enable master password
            request.setMasterPwd("Aa123456789"); // set master node's password
            request.setZoneId("cn-hangzhou-b"); // set zone
             // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),
            // 详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置
            // https://ecs.console.aliyun.com/#/create/postpay/
            request.setIoOptimized(true); // 设置IO优化参数
            request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列, ecs-1/ecs-2
            request.setNetType("classic"); // 设置网络类型 classic/vpc
            List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
            CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
            masterOrder.setIndex(1);
            masterOrder.setDiskCapacity(50);
            masterOrder.setDiskCount(2);
            masterOrder.setDiskType("CLOUD_EFFICIENCY"); // specify disk type (2)
            masterOrder.setInstanceType("ecs.n1.large"); // specify ecs instance type
            masterOrder.setNodeCount(1);
            masterOrder.setNodeType("MASTER"); // MASTER or CORE (2)
            ecsOrders.add(masterOrder);
            CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
            coreOrder.setIndex(2);
            coreOrder.setDiskCapacity(50);
            coreOrder.setDiskCount(4);
            coreOrder.setDiskType("CLOUD_EFFICIENCY");
            coreOrder.setInstanceType("ecs.n1.large");
            coreOrder.setNodeCount(3);
            coreOrder.setNodeType("CORE");
            ecsOrders.add(coreOrder);
            request.setEcsOrders(ecsOrders);
            try {
                CreateClusterResponse response = client.getAcsResponse(request);
                String clusterId = response.getClusterId(); // cluster id
                // TODO do something with this cluster
            } catch (Exception e) {
                // TODO do something
            }
        }
      • 创建集群需要指定集群属于哪个安全组。如果不指定安全组 ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。
      • 具体枚举取值,请参考:枚举类型
      • 上述示例代码中,是创建了一个经典网络的集群,如果需要创建 VPC 网络的集群,则需要将 request 中的网络类型设置为 vpc,并且指定 vpcid 和 vswitchid,如下所示:
        request.setNetType("vpc"); // 设置网络类型 classic/vpc
        request.setVpcId("your-vpcId");
        request.setVSwitchId("your-switchId");
      • 设置高可用参数,关于高可用参数的说明请参考创建集群的硬件配置部分。
        request.setHighAvailabilityEnable(true);
      • 设置可选软件组件,关于可选软件组件的说明请参考创建集群的软件配置章节。
        List<String> soft = new ArrayList<String>();
        soft.add("presto");
        soft.add("oozie");
        request.setOptionSoftWareLists(soft);
      • 设置配置项,请参考软件配置
        request.setConfigurations("oss://your-bucket/your-conf.json");
      • 设置引导操作,请参考引导操作
        List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
        CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
        bootstrapActionList.setName("bootstrapName");
        bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
        bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
        bootstrapActionLists.add(bootstrapActionList);
        request.setBootstrapActions(bootstrapActionLists);
    • 查询集群详情
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final DescribeClusterRequest request = new DescribeClusterRequest();
            request.setId("C-XXXXXXXXXXXXXXXX"); // cluster id
            try {
                DescribeClusterResponse response = client.getAcsResponse(request);
                DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
                // TODO do something with this cluster
            } catch (Exception e) {
                // TODO do something
            }
        }
    • 查询集群列表
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final ListClustersRequest request = new ListClustersRequest();
                request.setPageNumber(1);
                request.setIsDesc(true);
                request.setPageSize(20);
                try {
                    ListClustersResponse response = client.getAcsResponse(request);
                    List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
                    for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
                        // TODO do something with this cluster
                    }
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 释放集群
      public static void main(String[] args) {
                  IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                  DefaultAcsClient client = new DefaultAcsClient(profile);
                  ReleaseClusterRequest request = new ReleaseClusterRequest();
                  request.setId("C-XXXXXXXXXXXXXXXX"); // specify the cluster id you want to release.
                  try {
                      ReleaseClusterResponse response = client.getAcsResponse(request);
                  } catch (Exception e) {
                      // TODO do something
                  }
              }
  • 作业
    • 创建作业
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateJobRequest request = new CreateJobRequest();
            request.setName("Your-Job-Name");
            request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
            request.setFailAct("CONTINUE"); // STOP or CONTINUE
            request.setType("SPARK"); // SPARK or HADOOP or HIVE or PIG
      try {
                  CreateJobResponse response = client.getAcsResponse(request);
                  String jobId = response.getId();
                  // TODO do something with this job
              } catch (Exception e) {
                  // TODO do something
              }
          }
      ```
    • 删除作业
      注意 如果一个作业被其他执行计划使用,则不能删除,需要先删除对应的执行计划或者修改对应的执行计划。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final DeleteJobRequest request = new DeleteJobRequest();
                request.setId("J-XXXXXXXXXXXXXXXX"); // set  job id
                try {
                    DeleteJobResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
  • 执行计划
    • 创建执行计划
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final CreateExecutionPlanRequest request = new CreateExecutionPlanRequest();
                request.setName("Your-ExecutionPlan-Name");
                request.setCreateClusterOnDemand(false);
                request.setStrategy("RUN_MANUALLY"); // RUN_MANUALLY or SCHEDULE
                request.setClusterId("C-XXXXXXXXXXXXXXXX"); // specify an existing running cluster
                List<String> jobIds = new ArrayList<String>();
                jobIds.add("J-XXXXXXXXXXXXXXXX"); // specify a job
                request.setJobIdLists(jobIds);
                try {
                    CreateExecutionPlanResponse response = client.getAcsResponse(request);
                    String executionPlanId = response.getId();
                    // TODO do something with this execution plan
                } catch (Exception e) {
                    // TODO do something
                }
            }

      上述实例代码创建了一个手动执行(非周期调度的)执行计划,并且该执行计划关联了一个已经创建好的集群。

      如果需要创建一个周期调度的执行计划,则需要修改或者增加如下代码:
      request.setStrategy("SCHEDULE"); // RUN_MANUALLY or SCHEDULE
                request.setStartTime(new Date().getTime()); // set start time
                request.setTimeUnit("DAY"); // DAY or HOUR
                request.setTimeInterval(1); // set time interval
      如果需要创建一个按需创建集群的执行计划,则需要修改和增加如下代码:
      request.setCreateClusterOnDemand(true);
                request.setClusterType("HADOOP");
                request.setClusterName("Your-Cluster-Name");
                request.setEmrVer("EMR-1.3.0");
                request.setSecurityGroupId("Your-Security-Group-Id");
                request.setIsOpenPublicIp(true);
                 // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),
                // 详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置
                // https://ecs.console.aliyun.com/#/create/postpay/
                request.setIoOptimized(true); // 设置IO优化参数
                request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列, ecs-1/ecs-2
                request.setNetType("classic"); // 设置网络类型 classic/vpc
                request.setLogEnable(true);
                request.setLogPath("oss://xxx");
                request.setEcsOrders(); // TODO 参考创建集群的参数设置方式。注意,这里的 ecsOder 的类型为 CreateExecutionPlanRequest.EcsOrder,与创建集群的 CreateClusterRequest.EcsOrder 不同。

      通过上面的参数来指定一个集群的配置,具体的参数设置可以参考创建集群的逻辑。按需创建集群的执行计划,会在每次执行计划启动的时候按照设定的集群配置去新建一个临时集群来运行执行计划,执行计划完成之后该集群会自动释放。与创建集群逻辑稍有不同的是,这里创建的集群必须指定一个安全组 ID(即不能指定安全组名称来新建一个安全组)。

      当然,周期调度和按需创建集群并不矛盾,即可以创建一个执行计划,它是周期调度的并且是按需创建集群的。

    • 删除执行计划
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final DeleteExecutionPlanRequest request = new DeleteExecutionPlanRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // set execution plan id
                try {
                    DeleteExecutionPlanResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 运行执行计划
      注意 处于调度中或者正在运行的执行计划不能运行。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                RunExecutionPlanRequest request = new RunExecutionPlanRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id which to run
                try {
                    RunExecutionPlanResponse response = client.getAcsResponse(request);
                    String instanceId = response.getExecutionPlanInstanceId();
                    // TODO do something with this instance
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 暂停执行计划调度
      对于周期性的执行计划,如果正处于周期调度中(如下图所示),可以通过暂停执行计划的 SDK 将调度中的周期执行计划暂停。

      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                SuspendExecutionPlanSchedulerRequest request = new SuspendExecutionPlanSchedulerRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
                try {
                    SuspendExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 启动执行计划调度
      对于周期性的执行计划,如果正处于暂停调度状态中(如下图所示),可以通过启动执行计划调度的 SDK 将暂停中的周期执行计划启动调度。

      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                ResumeExecutionPlanSchedulerRequest request = new ResumeExecutionPlanSchedulerRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
                try {
                    ResumeExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 查询执行计划实例列表
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                ListExecutionPlanInstancesRequest request = new ListExecutionPlanInstancesRequest();
                // specify execution plan ids
                List<String> executionPlanIds = new ArrayList<String>();
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX1");
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX2");
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX3");
                request.setExecutionPlanIdLists(executionPlanIds); // (1)
                // specify order key (ordered by id)
                request.setIsDesc(true);
                // specify page number and page size, default page number is 1 and default page size is 10.
                request.setPageSize(20);
                request.setPageNumber(1);
                // specify if you want to list latest instance for each execution plan id.
                request.setOnlyLastInstance(true); // (2) default is false
                try {
                    ListExecutionPlanInstancesResponse response = client.getAcsResponse(request);
                    for (ListExecutionPlanInstancesResponse.ExecutionPlanInstance instance : response.getExecutionPlanInstances()) {
                        // TODO do something with each instance
                    }
                } catch (Exception e) {
                    // TODO do something
                }
            }
      • 查询执行计划的执行历史纪录,可以指定多个执行计划 ID。
      • 如果指定了只查上一次执行纪录,则会返回指定执行计划的上一次执行纪录,不会返回所有执行纪录数据。通常用于判断某个或者某些执行计划上次执行是否执行完成或者查询上次执行的执行状态。