目前对EMR OpenAPI暂不支持按照时间和负载弹性伸缩。但是,您可以添加触发逻辑,然后调用EMR OpenAPI实现对现有集群的扩容。

使用场景

您已经在EMR中创建了一个HadooP集群,包括Master、Core和Task类型的节点。希望通过OpenAPI实现对Task类型节点的添加。

现有集群和扩容节点的基本配置为:

  • 集群名为emr_openapi_demo, 集群ID为C-69CB0546800F****。
  • 需要扩容的是Task机器组,新增4个节点,每个节点为ecs.c5.xlarge,系统盘为120G*1的ESSD,数据盘为80G*4的高效云盘。

    扩容程序执行成功后,可通过如下步骤在控制台查看新增的节点。

    1. 登录阿里云E-MapReduce控制台
    2. 单击上方的集群管理页签。
    3. 集群管理页面的集群列表中,单击对应集群后面的详情
    4. 单击左侧导航栏的主机列表

      主机列表页面,可查看新增的节点。

  • 集群的Task机器组的GroupId为G-C73605CF4382****。
    说明 在集群扩容时,需要知道当前集群中待扩容机器组的GroupId。这个值的获取EMR提供了OpenAPI可以间接得到 (需要您从集群信息中解析得到)。

示例

Java和Python示例如下:

  • Java
    1. 根据集群所在Region和集群ID,获取该集群的Task机器组的GroupId。
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class DescribeClusterV2 {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              DescribeClusterV2Request request = new DescribeClusterV2Request();
              request.setRegionId("cn-hangzhou");
              request.setId("C-69CB0546800F****");
      
              try {
                  DescribeClusterV2Response response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }

      在得到的返回结果中找到Task机器组的GroupId。

      返回结果以JSON为例,GroupId的查找路径如下。

      ClusterInfo -> HostGroupList -> HostGroup -> HostGroupType=TASK -> HostGroupId
    2. 扩容代码
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.emr.model.v20160408.*;
      
      public class ResizeClusterV2 {
      
          public static void main(String[] args) {
              DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
              IAcsClient client = new DefaultAcsClient(profile);
      
              ResizeClusterV2Request request = new ResizeClusterV2Request();
              request.setRegionId("cn-hangzhou");
              request.setClusterId("C-69CB0546800F****");
      
              List<ResizeClusterV2Request.HostGroup> hostGroupList = new ArrayList<ResizeClusterV2Request.HostGroup>();
      
              ResizeClusterV2Request.HostGroup hostGroup1 = new ResizeClusterV2Request.HostGroup();
              hostGroup1.setClusterId("C-69CB0546800F****");
              hostGroup1.setHostGroupId("G-C73605CF4382****");
              hostGroup1.setHostGroupName("task_group");
              hostGroup1.setHostGroupType("TASK");
              hostGroup1.setNodeCount(4);
              hostGroup1.setInstanceType("ecs.c5.xlarge");
              hostGroupList.add(hostGroup1);
              request.setHostGroups(hostGroupList);
      
              try {
                  ResizeClusterV2Response response = client.getAcsResponse(request);
                  System.out.println(new Gson().toJson(response));
              } catch (ServerException e) {
                  e.printStackTrace();
              } catch (ClientException e) {
                  System.out.println("ErrCode:" + e.getErrCode());
                  System.out.println("ErrMsg:" + e.getErrMsg());
                  System.out.println("RequestId:" + e.getRequestId());
              }
      
          }
      }
  • Python
    1. 根据集群所在Region和集群ID,获取该集群的Task机器组的GroupId。
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.DescribeClusterV2Request import DescribeClusterV2Request
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = DescribeClusterV2Request()
      request.set_accept_format('json')
      
      request.set_Id("C-69CB0546800F****")
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))

      在得到的返回结果中找到Task机器组的GroupId。

      返回结果以JSON为例,GroupId的查找路径如下。

      ClusterInfo -> HostGroupList -> HostGroup -> HostGroupType=TASK -> HostGroupId
    2. 扩容代码
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkemr.request.v20160408.ResizeClusterV2Request import ResizeClusterV2Request
      
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = ResizeClusterV2Request()
      request.set_accept_format('json')
      
      request.set_ClusterId("C-69CB0546800F****")
      request.set_HostGroups([
        {
          "ClusterId": "C-69CB0546800F****",
          "HostGroupId": "G-C73605CF4382****",
          "HostGroupName": "task_group",
          "HostGroupType": "TASK",
          "NodeCount": 4,
          "InstanceType": "ecs.c5.xlarge"
        }
      ])
      
      response = client.do_action_with_exception(request)
      # python2:  print(response) 
      print(str(response, encoding='utf-8'))

参考OpenAPI