样例代码

本文介绍如何快速使用EMR Java SDK完成常见操作,例如创建集群、创建作业和扩缩容节点组等。

前提条件

请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见配置方案

创建集群

说明

如果您在2022年12月19日17点(UTC+8)以后第一次创建EMR集群,无法使用该接口创建集群,请使用CreateCluster - 创建集群

public static void main(String[] args) {
      IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
      DefaultAcsClient client = new DefaultAcsClient(profile);
      final CreateClusterRequest request = new CreateClusterRequest();
      request.setName("Your-Cluster-Name");
      request.setSecurityGroupId("Your-Security-Group-Id"); //如果未指定安全组ID,则将创建具有给定名称的新安全组。
      request.setAutoRenew(false);
      request.setChargeType("PostPaid"); //付费类型,按量付费。
      request.setClusterType("HADOOP"); //集群类型。
      request.setEmrVer("EMR-1.3.0"); //EMR版本。
      request.setIsOpenPublicIp(true);
      request.setLogEnable(true);
      request.setLogPath("oss://Your-Bucket/Your-Folder");
      request.setMasterPwdEnable(true); //启用主节点密码。
      request.setMasterPwd("Aa123456789"); //设置主节点密码。
      request.setZoneId("cn-hangzhou-b"); //设置地域。
       // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置。
      request.setIoOptimized(true); // 设置IO优化参数。
      request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列,取值支持ecs-1和ecs-2。
      request.setNetType("vpc"); // 设置网络类型。
      request.setVpcId("your-vpcId");
      request.setVSwitchId("your-switchId");
      List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
      CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
      masterOrder.setIndex(1);
      masterOrder.setDiskCapacity(50);
      masterOrder.setDiskCount(2);
      masterOrder.setDiskType("CLOUD_EFFICIENCY"); //指定磁盘类型。
      masterOrder.setInstanceType("ecs.n1.large"); //指定ecs实例类型。
      masterOrder.setNodeCount(1);
      masterOrder.setNodeType("MASTER"); // 主节点。
      ecsOrders.add(masterOrder);
      CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
      coreOrder.setIndex(2);
      coreOrder.setDiskCapacity(50);
      coreOrder.setDiskCount(4);
      coreOrder.setDiskType("CLOUD_EFFICIENCY");
      coreOrder.setInstanceType("ecs.n1.large");
      coreOrder.setNodeCount(3);
      coreOrder.setNodeType("CORE");
      ecsOrders.add(coreOrder);
      request.setEcsOrders(ecsOrders);
      try {
          CreateClusterResponse response = client.getAcsResponse(request);
          String clusterId = response.getClusterId(); // cluster id
          //对集群执行操作。
      } catch (Exception e) {

      }
  }
  • 创建集群需要指定集群所属的安全组。如果不指定安全组ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。

  • 设置高可用参数,详情请参见创建集群的硬件配置部分。

    request.setHighAvailabilityEnable(true);
  • 设置可选软件组件,详情请参见创建集群的软件配置章节。

    List<String> soft = new ArrayList<String>();
    soft.add("presto");
    soft.add("oozie");
    request.setOptionSoftWareLists(soft);
  • 设置配置项,详情请参见软件配置

    request.setConfigurations("oss://your-bucket/your-conf.json");
  • 设置引导操作,详情请参见引导操作

    List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
    CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
    bootstrapActionList.setName("bootstrapName");
    bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
    bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
    bootstrapActionLists.add(bootstrapActionList);
    request.setBootstrapActions(bootstrapActionLists);

查询集群详情

public static void main(String[] args) {
      IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
      DefaultAcsClient client = new DefaultAcsClient(profile);
      final DescribeClusterRequest request = new DescribeClusterRequest();
      request.setId("C-XXXXXXXXXXXXXXXX"); //集群ID。
      try {
          DescribeClusterResponse response = client.getAcsResponse(request);
          DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
          //对集群执行操作。
      } catch (Exception e) {

      }
  }

查询集群列表

public static void main(String[] args) {
          IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
          DefaultAcsClient client = new DefaultAcsClient(profile);
          final ListClustersRequest request = new ListClustersRequest();
          request.setPageNumber(1);
          request.setIsDesc(true);
          request.setPageSize(20);
          try {
              ListClustersResponse response = client.getAcsResponse(request);
              List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
              for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
                  //对集群执行操作。
              }
          } catch (Exception e) {

          }
      }

释放集群

public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultAcsClient client = new DefaultAcsClient(profile);
            ReleaseClusterRequest request = new ReleaseClusterRequest();
            request.setId("C-XXXXXXXXXXXXXXXX"); //指定要释放的集群ID。
            try {
                ReleaseClusterResponse response = client.getAcsResponse(request);
            } catch (Exception e) {

            }
        }

创建作业

public static void main(String[] args) {
      IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
      DefaultAcsClient client = new DefaultAcsClient(profile);
      final CreateJobRequest request = new CreateJobRequest();
      request.setName("Your-Job-Name");
      request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
      request.setFailAct("CONTINUE"); //继续作业。
      request.setType("SPARK"); //作业类型。
      try {
            CreateJobResponse response = client.getAcsResponse(request);
            String jobId = response.getId();
        } catch (Exception e) {

        }
    }

删除作业

重要

如果一个作业被其他工作流使用,则不能删除,需要先删除对应的工作流或者修改对应的工作流。

public static void main(String[] args) {
          IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
          DefaultAcsClient client = new DefaultAcsClient(profile);
          final DeleteJobRequest request = new DeleteJobRequest();
          request.setId("J-XXXXXXXXXXXXXXXX"); //设置作业ID。
          try {
              DeleteJobResponse response = client.getAcsResponse(request);
          } catch (Exception e) {

          }
      }

扩容节点组

通过调整数量扩容节点组。

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ResizeClusterV2Request request = new ResizeClusterV2Request();
request.setClusterId("C-0E4B90219*****");
List<ResizeClusterV2Request.HostGroup> hostGroups = new ArrayList<>();
ResizeClusterV2Request.HostGroup hostGroup = new ResizeClusterV2Request.HostGroup();
hostGroups.add(hostGroup);
hostGroup.setHostGroupId("G-F0D0661E0A6E****");
//扩容数量。
hostGroup.setNodeCount(1);
request.setHostGroups(hostGroups);

System.out.println(JSON.toJSONString(client.getAcsResponse(request)));

缩容节点组

您可以选择通过调整指定节点组数量缩容节点组或通过实例ID缩容节点组。

通过调整数量缩容节点组

重要

使用该特性,您需要升级aliyun-java-sdk-emr到3.3.8版本。

<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>aliyun-java-sdk-emr</artifactId>
  <version>3.3.8</version>
</dependency>
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# 集群ID。
request.setClusterId("C-01A1F4A********");
# 节点组ID,可通过ListClusterHostGroup接口获取节点组ID。
request.setHostGroupId("G-D11D3E*******");

//指定释放数量。
request.setReleaseNumber(3);
request.setEnableGracefulDecommission(true);
//单位为秒。
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));

当启用Yarn Decommission选项时,在EMR控制台YARN服务的配置页面,搜索参数yarn.resourcemanager.nodes.exclude-path,将其值改为/etc/ecm/hadoop-conf/yarn-exclude.xml并保存和部署,或者可以通过以下代码完成修改。

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ModifyClusterServiceConfigRequest modifyClusterServiceConfigRequest = new ModifyClusterServiceConfigRequest();
modifyClusterServiceConfigRequest.setClusterId("C-01A1F4A********");
modifyClusterServiceConfigRequest.setRegionId("cn-hangzhou");
modifyClusterServiceConfigRequest.setServiceName("YARN");
modifyClusterServiceConfigRequest.setConfigParams("{\"yarn-site\":{\"yarn.resourcemanager.nodes.exclude-path\":\"/etc/ecm/hadoop-conf/yarn-exclude.xml\"}}");
modifyClusterServiceConfigRequest.setCustomConfigParams("{}");
modifyClusterServiceConfigRequest.setComment("for decommission gracefully");
modifyClusterServiceConfigRequest.setRefreshHostConfig(Boolean.TRUE);
System.out.println(JSON.toJSONString(client.getAcsResponse(modifyClusterServiceConfigRequest)));

通过实例ID缩容节点组

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);

ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# 集群ID。
request.setClusterId("C-C52CF4246D10****");
# 节点组ID,可通过ListClusterHostGroup接口获取节点组ID。
request.setHostGroupId("G-A24651D939AD****");

//指定释放节点。
List<String> instanceIds = new ArrayList<>();
instanceIds.add("i-xxxxxxx");
instanceIds.add("i-xxxxxxy");
//需要JSON一下。
request.setInstanceIdList(JSON.toJSONString(instanceIds));

request.setEnableGracefulDecommission(true);
//单位为秒。
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));