本文介绍如何快速使用EMR Java SDK完成常见操作,例如创建集群、创建作业和扩缩容节点组等。
前提条件
请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见配置方案。
创建集群
说明
如果您在2022年12月19日17点(UTC+8)以后第一次创建EMR集群,无法使用该接口创建集群,请使用CreateCluster - 创建集群。
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final CreateClusterRequest request = new CreateClusterRequest();
request.setName("Your-Cluster-Name");
request.setSecurityGroupId("Your-Security-Group-Id"); //如果未指定安全组ID,则将创建具有给定名称的新安全组。
request.setAutoRenew(false);
request.setChargeType("PostPaid"); //付费类型,按量付费。
request.setClusterType("HADOOP"); //集群类型。
request.setEmrVer("EMR-1.3.0"); //EMR版本。
request.setIsOpenPublicIp(true);
request.setLogEnable(true);
request.setLogPath("oss://Your-Bucket/Your-Folder");
request.setMasterPwdEnable(true); //启用主节点密码。
request.setMasterPwd("Aa123456789"); //设置主节点密码。
request.setZoneId("cn-hangzhou-b"); //设置地域。
// io优化参数,ecs系列以及网络类型会决定可选的硬件配置(ecs实例类型,云盘类型),详情情况可以参照ecs购买页面可以选择的组合和支持的类型来设置。
request.setIoOptimized(true); // 设置IO优化参数。
request.setInstanceGeneration("ecs-2"); // 设置为ecs II系列,取值支持ecs-1和ecs-2。
request.setNetType("vpc"); // 设置网络类型。
request.setVpcId("your-vpcId");
request.setVSwitchId("your-switchId");
List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
masterOrder.setIndex(1);
masterOrder.setDiskCapacity(50);
masterOrder.setDiskCount(2);
masterOrder.setDiskType("CLOUD_EFFICIENCY"); //指定磁盘类型。
masterOrder.setInstanceType("ecs.n1.large"); //指定ecs实例类型。
masterOrder.setNodeCount(1);
masterOrder.setNodeType("MASTER"); // 主节点。
ecsOrders.add(masterOrder);
CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
coreOrder.setIndex(2);
coreOrder.setDiskCapacity(50);
coreOrder.setDiskCount(4);
coreOrder.setDiskType("CLOUD_EFFICIENCY");
coreOrder.setInstanceType("ecs.n1.large");
coreOrder.setNodeCount(3);
coreOrder.setNodeType("CORE");
ecsOrders.add(coreOrder);
request.setEcsOrders(ecsOrders);
try {
CreateClusterResponse response = client.getAcsResponse(request);
String clusterId = response.getClusterId(); // cluster id
//对集群执行操作。
} catch (Exception e) {
}
}
创建集群需要指定集群所属的安全组。如果不指定安全组ID,则需要指定一个安全组名称,在创建集群的同时新建一个安全组。
设置高可用参数,详情请参见创建集群的硬件配置部分。
request.setHighAvailabilityEnable(true);
设置可选软件组件,详情请参见创建集群的软件配置章节。
List<String> soft = new ArrayList<String>(); soft.add("presto"); soft.add("oozie"); request.setOptionSoftWareLists(soft);
设置配置项,详情请参见软件配置。
request.setConfigurations("oss://your-bucket/your-conf.json");
设置引导操作,详情请参见引导操作。
List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>(); CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction(); bootstrapActionList.setName("bootstrapName"); bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py"); bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc"); bootstrapActionLists.add(bootstrapActionList); request.setBootstrapActions(bootstrapActionLists);
查询集群详情
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final DescribeClusterRequest request = new DescribeClusterRequest();
request.setId("C-XXXXXXXXXXXXXXXX"); //集群ID。
try {
DescribeClusterResponse response = client.getAcsResponse(request);
DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
//对集群执行操作。
} catch (Exception e) {
}
}
查询集群列表
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final ListClustersRequest request = new ListClustersRequest();
request.setPageNumber(1);
request.setIsDesc(true);
request.setPageSize(20);
try {
ListClustersResponse response = client.getAcsResponse(request);
List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
//对集群执行操作。
}
} catch (Exception e) {
}
}
释放集群
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterRequest request = new ReleaseClusterRequest();
request.setId("C-XXXXXXXXXXXXXXXX"); //指定要释放的集群ID。
try {
ReleaseClusterResponse response = client.getAcsResponse(request);
} catch (Exception e) {
}
}
创建作业
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final CreateJobRequest request = new CreateJobRequest();
request.setName("Your-Job-Name");
request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
request.setFailAct("CONTINUE"); //继续作业。
request.setType("SPARK"); //作业类型。
try {
CreateJobResponse response = client.getAcsResponse(request);
String jobId = response.getId();
} catch (Exception e) {
}
}
删除作业
重要
如果一个作业被其他工作流使用,则不能删除,需要先删除对应的工作流或者修改对应的工作流。
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final DeleteJobRequest request = new DeleteJobRequest();
request.setId("J-XXXXXXXXXXXXXXXX"); //设置作业ID。
try {
DeleteJobResponse response = client.getAcsResponse(request);
} catch (Exception e) {
}
}
扩容节点组
通过调整数量扩容节点组。
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ResizeClusterV2Request request = new ResizeClusterV2Request();
request.setClusterId("C-0E4B90219*****");
List<ResizeClusterV2Request.HostGroup> hostGroups = new ArrayList<>();
ResizeClusterV2Request.HostGroup hostGroup = new ResizeClusterV2Request.HostGroup();
hostGroups.add(hostGroup);
hostGroup.setHostGroupId("G-F0D0661E0A6E****");
//扩容数量。
hostGroup.setNodeCount(1);
request.setHostGroups(hostGroups);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
缩容节点组
您可以选择通过调整指定节点组数量缩容节点组或通过实例ID缩容节点组。
通过调整数量缩容节点组
重要
使用该特性,您需要升级aliyun-java-sdk-emr到3.3.8版本。
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-emr</artifactId>
<version>3.3.8</version>
</dependency>
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# 集群ID。
request.setClusterId("C-01A1F4A********");
# 节点组ID,可通过ListClusterHostGroup接口获取节点组ID。
request.setHostGroupId("G-D11D3E*******");
//指定释放数量。
request.setReleaseNumber(3);
request.setEnableGracefulDecommission(true);
//单位为秒。
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
当启用Yarn Decommission选项时,在EMR控制台YARN服务的配置页面,搜索参数yarn.resourcemanager.nodes.exclude-path,将其值改为/etc/ecm/hadoop-conf/yarn-exclude.xml并保存和部署,或者可以通过以下代码完成修改。
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ModifyClusterServiceConfigRequest modifyClusterServiceConfigRequest = new ModifyClusterServiceConfigRequest();
modifyClusterServiceConfigRequest.setClusterId("C-01A1F4A********");
modifyClusterServiceConfigRequest.setRegionId("cn-hangzhou");
modifyClusterServiceConfigRequest.setServiceName("YARN");
modifyClusterServiceConfigRequest.setConfigParams("{\"yarn-site\":{\"yarn.resourcemanager.nodes.exclude-path\":\"/etc/ecm/hadoop-conf/yarn-exclude.xml\"}}");
modifyClusterServiceConfigRequest.setCustomConfigParams("{}");
modifyClusterServiceConfigRequest.setComment("for decommission gracefully");
modifyClusterServiceConfigRequest.setRefreshHostConfig(Boolean.TRUE);
System.out.println(JSON.toJSONString(client.getAcsResponse(modifyClusterServiceConfigRequest)));
通过实例ID缩容节点组
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# 集群ID。
request.setClusterId("C-C52CF4246D10****");
# 节点组ID,可通过ListClusterHostGroup接口获取节点组ID。
request.setHostGroupId("G-A24651D939AD****");
//指定释放节点。
List<String> instanceIds = new ArrayList<>();
instanceIds.add("i-xxxxxxx");
instanceIds.add("i-xxxxxxy");
//需要JSON一下。
request.setInstanceIdList(JSON.toJSONString(instanceIds));
request.setEnableGracefulDecommission(true);
//单位为秒。
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
文档内容是否对您有帮助?