本文介绍如何使用Java SDK快速创建集群和创建作业。

背景信息

OpenAPI开发者门户提供在线调试API和动态生成SDK示例代码的功能,能显著降低API的使用难度,推荐您使用。

环境准备

在Eclipse项目中使用阿里云E-MapReduce OpenAPI Java SDK。
  • 创建一个Maven工程,添加Maven依赖。
    <dependency>
         <groupId>com.aliyun</groupId>
         <artifactId>aliyun-java-sdk-core</artifactId>
         <version>3.5.0</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-emr</artifactId>
        <version>3.1.0</version>
    </dependency>
  • 下载对应的JAR文件到本地。以Eclipse为例,其操作步骤如下:
    1. 下载SDK。

      下载方式请参见下载地址

    2. 将下载后的文件拷贝到您的项目中。
    3. 在Eclipse中选择右键单击您的工程名称,然后选择 Properties > Java Build Path > Add JARs
    4. 选中您在步骤2中拷贝的所有JAR文件。

初始化Client

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
DefaultAcsClient client = new DefaultAcsClient(profile);

示例代码

  • 集群
    • 创建集群
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateClusterRequest request = new CreateClusterRequest();
            request.setName("Your-Cluster-Name");
            request.setSecurityGroupId("Your-Security-Group-Id"); //如果未指定安全组ID,则将创建具有给定名称的新安全组。
            request.setAutoRenew(false);
            request.setChargeType("PostPaid"); //付费类型,按量付费。
            request.setClusterType("HADOOP"); //集群类型。
            request.setEmrVer("EMR-1.3.0"); //EMR版本。
            request.setIsOpenPublicIp(true);
            request.setLogEnable(true);
            request.setLogPath("oss://Your-Bucket/Your-Folder");
            request.setMasterPwdEnable(true); //启用主节点密码。
            request.setMasterPwd("Aa123456789"); //设置主节点密码。
            request.setZoneId("cn-hangzhou-b"); //设置地域。
             // io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置。
            request.setIoOptimized(true); // 设置IO优化参数。
            request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列,取值支持ecs-1和ecs-2。
            request.setNetType("vpc"); // 设置网络类型。
            request.setVpcId("your-vpcId");
            request.setVSwitchId("your-switchId");
            List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
            CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
            masterOrder.setIndex(1);
            masterOrder.setDiskCapacity(50);
            masterOrder.setDiskCount(2);
            masterOrder.setDiskType("CLOUD_EFFICIENCY"); //指定磁盘类型。
            masterOrder.setInstanceType("ecs.n1.large"); //指定ecs实例类型。
            masterOrder.setNodeCount(1);
            masterOrder.setNodeType("MASTER"); // 主节点。
            ecsOrders.add(masterOrder);
            CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
            coreOrder.setIndex(2);
            coreOrder.setDiskCapacity(50);
            coreOrder.setDiskCount(4);
            coreOrder.setDiskType("CLOUD_EFFICIENCY");
            coreOrder.setInstanceType("ecs.n1.large");
            coreOrder.setNodeCount(3);
            coreOrder.setNodeType("CORE");
            ecsOrders.add(coreOrder);
            request.setEcsOrders(ecsOrders);
            try {
                CreateClusterResponse response = client.getAcsResponse(request);
                String clusterId = response.getClusterId(); // cluster id
                //对集群执行操作。
            } catch (Exception e) {
      
            }
        }
      • 创建集群需要指定集群所属的安全组。如果不指定安全组ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。
      • 设置高可用参数,详情请参见创建集群的硬件配置部分。
        request.setHighAvailabilityEnable(true);
      • 设置可选软件组件,详情请参见创建集群的软件配置章节。
        List<String> soft = new ArrayList<String>();
        soft.add("presto");
        soft.add("oozie");
        request.setOptionSoftWareLists(soft);
      • 设置配置项,详情请参见软件配置
        request.setConfigurations("oss://your-bucket/your-conf.json");
      • 设置引导操作,详情请参见引导操作
        List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
        CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
        bootstrapActionList.setName("bootstrapName");
        bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
        bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
        bootstrapActionLists.add(bootstrapActionList);
        request.setBootstrapActions(bootstrapActionLists);
    • 查询集群详情
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final DescribeClusterRequest request = new DescribeClusterRequest();
            request.setId("C-XXXXXXXXXXXXXXXX"); //集群ID。
            try {
                DescribeClusterResponse response = client.getAcsResponse(request);
                DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
                //对集群执行操作。
            } catch (Exception e) {
      
            }
        }
    • 查询集群列表
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final ListClustersRequest request = new ListClustersRequest();
                request.setPageNumber(1);
                request.setIsDesc(true);
                request.setPageSize(20);
                try {
                    ListClustersResponse response = client.getAcsResponse(request);
                    List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
                    for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
                        //对集群执行操作。
                    }
                } catch (Exception e) {
      
                }
            }
    • 释放集群
      public static void main(String[] args) {
                  IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                  DefaultAcsClient client = new DefaultAcsClient(profile);
                  ReleaseClusterRequest request = new ReleaseClusterRequest();
                  request.setId("C-XXXXXXXXXXXXXXXX"); //指定要释放的集群ID。
                  try {
                      ReleaseClusterResponse response = client.getAcsResponse(request);
                  } catch (Exception e) {
      
                  }
              }
  • 作业
    • 创建作业
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateJobRequest request = new CreateJobRequest();
            request.setName("Your-Job-Name");
            request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
            request.setFailAct("CONTINUE"); //继续作业。
            request.setType("SPARK"); //作业类型。
      try {
                  CreateJobResponse response = client.getAcsResponse(request);
                  String jobId = response.getId();
              } catch (Exception e) {
      
              }
          }
    • 删除作业
      注意 如果一个作业被其他工作流使用,则不能删除,需要先删除对应的工作流或者修改对应的工作流。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final DeleteJobRequest request = new DeleteJobRequest();
                request.setId("J-XXXXXXXXXXXXXXXX"); //设置作业ID。
                try {
                    DeleteJobResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
      
                }
            }