您可以使用阿里云开发者工具套件(Alibaba Cloud SDK for Java),不用复杂编程即可访问Flink全托管服务。本文为您介绍如何安装并使用阿里云开发者工具套件访问Flink全托管服务。
注意事项
2022年9月19日,我们对Flink全托管产品版本进行了升级,后续将默认使用新的SDK版本为您服务。本次版本在全网更新发布的时间为2022年9月19日至2022年10月27日。
说明
- 关于SDK升级的影响,请参见产品公告。
- 本文档为新版SDK使用文档,如果您还需查看旧的SDK使用文档,请参见OpenAPI SDK。
前提条件
- RAM用户已创建AccessKey。
说明 为避免主账号泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户Flink全托管相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:
- 创建RAM用户操作步骤,请参见创建RAM用户。
- Flink全托管相关访问权限,请参见手动授权(方式一)。
- 为RAM用户创建AccessKey,请参见创建AccessKey。
- 已安装Java环境。Alibaba Cloud SDK for Java要求使用JDK 8或更高版本。
- RAM用户需要在VVP平台上已完成RAM用户授权和作业操作账号授权。详情请参见RAM用户授权和作业操作账号授权。
使用限制
仅支持以下几个地域:
- 杭州
- 上海
- 深圳
- 北京
- 新加坡
- 张家口
说明 暂不支持其他地域。
安装Alibaba Cloud SDK for Java
- 通过以下任一方式在IDEA中配置Maven项目管理工具。
- 使用IDEA中集成的Maven项目管理工具。
- 访问Maven官方下载页面(Download Apache Maven)下载对应操作系统的Maven工具,手动配置Maven工具。
- 通过以下任一方式创建Maven项目。
- 方式一:在IDEA中添加一个Maven项目。
- 方式二:将已有的项目转换为Maven项目。
- 右键单击要转换的项目,并选择Add Framework Support...。
- 选择Maven,并单击OK。
- 右键单击要转换的项目,并选择Add Framework Support...。
- 方式一:在IDEA中添加一个Maven项目。
- 在项目目录下的pom.xml文件中,添加SDK依赖。
<dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>ververica20220718</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-openapi</artifactId> <version>0.2.6</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.2.1</version> </dependency> </dependencies>
添加依赖后,Maven项目管理工具会自动下载相关JAR包,您可以在项目目录下的External Libraries查看已成功导入的依赖。
请求步骤
- 配置服务所在的Endpoint、用户的AccessKeyId和AccessKeySecret,来生成Client对象。
/* 目前支持以下地域,对应的Endpoint如下。 杭州Endpoint: ververica.cn-hangzhou.aliyuncs.com 上海Endpoint: ververica.cn-shanghai.aliyuncs.com 深圳Endpoint: ververica.cn-shenzhen.aliyuncs.com 北京Endpoint: ververica.cn-beijing.aliyuncs.com 新加坡Endpoint: ververica.ap-southeast-1.aliyuncs.com 张家口Endpoint: ververica.cn-zhangjiakou.aliyuncs.com */ String endpoint = "<endpoint>"; String accessKey = "<AccessKey>"; String secret = "<Secret>"; Config config = new com.aliyun.teaopenapi.models.Config(); config.setAccessKeyId(accessKey); config.setAccessKeySecret(secret); config.setEndpoint(endpoint); Client client = new Client(config);
- 初始化请求类,使用setter设置请求参数。以ListDeployments请求为例。
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest(); listDeploymentsRequest.setPageIndex(1); // 分页参数,页码必须大于0。 listDeploymentsRequest.setPageSize(100); // 分页参数,每页元素数量大于0,不超过100。 // 您可以在Flink开发控制台,工作空间详情中获取工作空间ID(workspaceId)。 // 请不要在workspaceId中直接写入工作空间名称。 String workspaceId = "<workspaceId>"; ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders(); listDeploymentsHeaders.setWorkspace(workspaceId);
- 调用并返回结果。
String namespace = "<namespace>"; ListDeploymentsResponse result = client.listDeploymentsWithOptions( namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions()); // 判断请求结果是否异常,如果异常,可以从errorCode及errorMessage中获取异常信息。 if (!result.getBody().getSuccess()) { throw new RuntimeException( String.format("Failed to list deployments, errorCode: %s, errorMessage: %s.", result.getBody().errorCode, result.getBody().errorMessage)); } // 获取正常结果。 List<Deployment> deployments = result.getBody().getData();
参考示例
String endpoint = "ververica.cn-hangzhou.aliyuncs.com";
Config config = new com.aliyun.teaopenapi.models.Config();
config.setAccessKeyId("<AccessKey>");
config.setAccessKeySecret("<Secret>");
config.setEndpoint(endpoint);
Client client = new Client(config);
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
listDeploymentsRequest.setPageIndex(1);
listDeploymentsRequest.setPageSize(100);
ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
listDeploymentsHeaders.setWorkspace("workspaceId");
ListDeploymentsResponse result =
client.listDeploymentsWithOptions(
"namespace", listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());
if (!result.getBody().getSuccess()) {
throw new RuntimeException(
String.format("Failed to list deployments, errorCode: %s, errorMessage: %s.",
result.getBody().errorCode, result.getBody().errorMessage));
}
List<Deployments> deployments = result.getBody().getData();
全部SDK DEMO
package com.ververica.sdk.examples;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.aliyun.ververica20220718.Client;
import com.aliyun.ververica20220718.models.*;
import java.util.Map;
public class Examples {
public static void main(String[] args) throws Exception {
/*
目前支持以下地域,对应的Endpoint如下。
杭州Endpoint: ververica.cn-hangzhou.aliyuncs.com
上海Endpoint: ververica.cn-shanghai.aliyuncs.com
深圳Endpoint: ververica.cn-shenzhen.aliyuncs.com
北京Endpoint: ververica.cn-beijing.aliyuncs.com
新加坡Endpoint: ververica.ap-southeast-1.aliyuncs.com
张家口Endpoint: ververica.cn-zhangjiakou.aliyuncs.com
*/
String endpoint = "ververica.cn-hangzhou.aliyuncs.com";
String accessKey = "<AccessKey>";
String secret = "<Secret>";
String workspaceId = "<workspaceId>";
String namespace = "<namespace>";
Config config = new com.aliyun.teaopenapi.models.Config();
config.setAccessKeyId(accessKey);
config.setAccessKeySecret(secret);
config.setEndpoint(endpoint);
Client client = new Client(config);
// 创建变量。
String variableName = "name";
String variableValue = "value";
Variable variable = new Variable();
variable.setKind("Plain"); // only Plain is supported now
variable.setDescription("this is a sdk variable example.");
variable.setName(variableName);
variable.setValue(variableValue);
CreateVariableRequest createVariableRequest = new CreateVariableRequest();
createVariableRequest.setBody(variable);
CreateVariableHeaders createVariableHeaders = new CreateVariableHeaders();
createVariableHeaders.setWorkspace(workspaceId);
CreateVariableResponse createVariableResponse =
client.createVariableWithOptions(namespace, createVariableRequest, createVariableHeaders, new RuntimeOptions());
// 获取变量列表。
ListVariablesRequest listVariablesRequest = new ListVariablesRequest();
listVariablesRequest.setPageIndex(1);
listVariablesRequest.setPageSize(100);
ListVariablesHeaders listVariablesHeaders = new ListVariablesHeaders();
listVariablesHeaders.setWorkspace(workspaceId);
ListVariablesResponse listVariablesResponse =
client.listVariablesWithOptions(namespace, listVariablesRequest, listVariablesHeaders, new RuntimeOptions());
// 删除变量。
DeleteVariableHeaders deleteVariableHeaders = new DeleteVariableHeaders();
deleteVariableHeaders.setWorkspace(workspaceId);
DeleteVariableResponse result =
client.deleteVariableWithOptions(namespace, variableName, deleteVariableHeaders, new RuntimeOptions());
// 获取引擎版本列表。
ListEngineVersionMetadataHeaders listEngineVersionMetadataHeaders = new ListEngineVersionMetadataHeaders();
listEngineVersionMetadataHeaders.setWorkspace(workspaceId);
ListEngineVersionMetadataResponse listEngineVersionMetadataResponse =
client.listEngineVersionMetadataWithOptions(listEngineVersionMetadataHeaders, new RuntimeOptions());
// 获取部署目标列表。
ListDeploymentTargetsRequest listDeploymentTargetsRequest = new ListDeploymentTargetsRequest();
listDeploymentTargetsRequest.setPageSize(100);
listDeploymentTargetsRequest.setPageIndex(1);
ListDeploymentTargetsHeaders listDeploymentTargetsHeaders = new ListDeploymentTargetsHeaders();
listDeploymentTargetsHeaders.setWorkspace(workspaceId);
ListDeploymentTargetsResponse listDeploymentTargetsResponse =
client.listDeploymentTargetsWithOptions(namespace, listDeploymentTargetsRequest, listDeploymentTargetsHeaders, new RuntimeOptions());
String deploymentTargetName = listDeploymentTargetsResponse.getBody().getData().get(0).getName();
// 创建已部署作业。
SqlArtifact sqlArtifact = new SqlArtifact();
sqlArtifact.setSqlScript(
" create temporary table `datagen` ( id varchar, name varchar ) with ( 'connector' = 'datagen' ); create temporary table `blackhole` ( id varchar, name varchar ) with ( 'connector' = 'blackhole' ); insert into blackhole select * from datagen;");
Artifact artifact = new Artifact();
artifact.setKind("SQLSCRIPT");
artifact.setSqlArtifact(sqlArtifact);
BriefDeploymentTarget briefDeploymentTarget = new BriefDeploymentTarget();
briefDeploymentTarget.setName(deploymentTargetName);
briefDeploymentTarget.setMode("PER_JOB");
Deployment deployment = new Deployment();
deployment.setName("deploymentName");
deployment.setExecutionMode("Batch"); // STREAMING || BATCH
deployment.setDescription("this is a example deployment");
deployment.setEngineVersion(userExpectedEngineVersion);
deployment.setArtifact(artifact);
deployment.setDeploymentTarget(briefDeploymentTarget);
CreateDeploymentRequest createDeploymentRequest = new CreateDeploymentRequest();
createDeploymentRequest.setBody(deployment);
CreateDeploymentHeaders createDeploymentHeaders = new CreateDeploymentHeaders();
createDeploymentHeaders.setWorkspace(workspaceId);
CreateDeploymentResponse createDeploymentResponse =
client.createDeploymentWithOptions(namespace, createDeploymentRequest, createDeploymentHeaders, new RuntimeOptions());
String deploymentId = createDeploymentResponse.getBody().getData().getDeploymentId();
// 获取已部署作业。
GetDeploymentHeaders getDeploymentHeaders = new GetDeploymentHeaders();
getDeploymentHeaders.setWorkspace(workspaceId);
GetDeploymentResponse getDeploymentResponse =
client.getDeploymentWithOptions(namespace, deploymentId, getDeploymentHeaders, new RuntimeOptions());
// 获取已部署作业列表。
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
listDeploymentsRequest.setPageIndex(1);
listDeploymentsRequest.setPageSize(100);
ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
listDeploymentsHeaders.setWorkspace(workspaceId);
ListDeploymentsResponse listDeploymentsResponse =
client.listDeploymentsWithOptions(namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());
// 更新已部署作业。
Deployment updateParams = new Deployment();
String description = "this is a ververica sdk test for updating deployment";
updateParams.setDescription(description);
UpdateDeploymentRequest updateDeploymentRequest = new UpdateDeploymentRequest();
updateDeploymentRequest.setBody(deployment);
UpdateDeploymentHeaders updateDeploymentHeaders = new UpdateDeploymentHeaders();
updateDeploymentHeaders.setWorkspace(workspaceId);
UpdateDeploymentResponse updateDeploymentResponse =
client.updateDeploymentWithOptions(namespace, deploymentId, updateDeploymentRequest, updateDeploymentHeaders, new RuntimeOptions());
// 删除已部署作业。
DeleteDeploymentHeaders deleteDeploymentHeaders = new DeleteDeploymentHeaders();
deleteDeploymentHeaders.setWorkspace(workspaceId);
DeleteDeploymentResponse deleteDeploymentResponse =
client.deleteDeploymentWithOptions(
namespace, deploymentId, deleteDeploymentHeaders, new RuntimeOptions());
// 启动作业实例。
double cpu = 1.0;
String memory = "4Gi";
DeploymentRestoreStrategy restoreStrategy = new DeploymentRestoreStrategy();
restoreStrategy.setKind("NONE");
BasicResourceSettingSpec basicResourceSettingSpec = new BasicResourceSettingSpec();
basicResourceSettingSpec.setCpu(cpu);
basicResourceSettingSpec.setMemory(memory);
// basic resource setting
BasicResourceSetting basicResourceSetting = new BasicResourceSetting();
basicResourceSetting.setParallelism(1L);
basicResourceSetting.setJobmanagerResourceSettingSpec(basicResourceSettingSpec);
basicResourceSetting.setTaskmanagerResourceSettingSpec(basicResourceSettingSpec);
// streaming resource setting
StreamingResourceSetting streamingResourceSetting = new StreamingResourceSetting();
streamingResourceSetting.setResourceSettingMode("BASIC");
streamingResourceSetting.setBasicResourceSetting(basicResourceSetting);
// brief resource setting
BriefResourceSetting briefResourceSetting = new BriefResourceSetting();
briefResourceSetting.setStreamingResourceSetting(streamingResourceSetting);
briefResourceSetting.setFlinkConf(Map.of("key", "value"));
StartJobRequestBody startJobRequestBody = new StartJobRequestBody();
startJobRequestBody.setDeploymentId(deploymentId);
startJobRequestBody.setRestoreStrategy(restoreStrategy);
startJobRequestBody.setResourceSettingSpec(briefResourceSetting);
StartJobRequest startJobRequest = new StartJobRequest();
startJobRequest.setBody(startJobRequestBody);
StartJobHeaders startJobHeaders = new StartJobHeaders();
startJobHeaders.setWorkspace(workspaceId);
StartJobResponse startJobResponse =
client.startJobWithOptions(
namespace, startJobRequest, startJobHeaders, new RuntimeOptions());
String jobId = startJobResponse.getBody().getData().getJobId();
// 获取作业实例。
GetJobHeaders getJobHeaders = new GetJobHeaders();
getJobHeaders.setWorkspace(workspaceId);
GetJobResponse getJobResponse =
client.getJobWithOptions(namespace, jobId, getJobHeaders, new RuntimeOptions());
// 获取作业实例列表。
ListJobsRequest listJobsRequest = new ListJobsRequest();
listJobsRequest.setDeploymentId(deploymentId);
listJobsRequest.setPageIndex(1);
listJobsRequest.setPageSize(100);
ListJobsHeaders listJobsHeaders = new ListJobsHeaders();
listJobsHeaders.setWorkspace(workspaceId);
ListJobsResponse listJobsResponse =
client.listJobsWithOptions(namespace, listJobsRequest, listJobsHeaders, new RuntimeOptions());
// 停止作业实例。
StopJobRequestBody stopJobRequestBody = new StopJobRequestBody();
stopJobRequestBody.setStopStrategy("NONE");
StopJobRequest stopJobRequest = new StopJobRequest();
stopJobRequest.setBody(stopJobRequestBody);
StopJobHeaders stopJobHeaders = new StopJobHeaders();
stopJobHeaders.setWorkspace(workspaceId);
StopJobResponse stopJobResponse =
client.stopJobWithOptions(namespace, jobId, stopJobRequest, stopJobHeaders, new RuntimeOptions());
// 删除作业实例。
DeleteJobHeaders deleteJobHeaders = new DeleteJobHeaders();
deleteJobHeaders.setWorkspace(workspaceId);
DeleteJobResponse deleteJobResponse = client.deleteJobWithOptions(namespace, jobId, deleteJobHeaders, new RuntimeOptions());
// 创建作业快照。
CreateSavepointRequest createSavepointRequest = new CreateSavepointRequest();
createSavepointRequest.setDeploymentId(deploymentId);
createSavepointRequest.setNativeFormat(false);
createSavepointRequest.setDescription("savepoint demo");
CreateSavepointHeaders createSavepointHeaders = new CreateSavepointHeaders();
createSavepointHeaders.setWorkspace(workspaceId);
CreateSavepointResponse createSavepointResponse = client.createSavepointWithOptions(namespace, createSavepointRequest, createSavepointHeaders, new RuntimeOptions());
String savepointId = createSavepointResponse.getBody().getData().getSavepointId();
// 获取作业快照。
GetSavepointHeaders getSavepointHeaders = new GetSavepointHeaders();
getSavepointHeaders.setWorkspace(workspaceId);
GetSavepointResponse getSavepointResponse = client.getSavepointWithOptions(namespace, savepointId, getSavepointHeaders, new RuntimeOptions());
// 获取作业快照列表。
ListSavepointsRequest listSavepointsRequest = new ListSavepointsRequest();
listSavepointsRequest.setPageIndex(1);
listSavepointsRequest.setPageSize(100);
ListSavepointsHeaders listSavepointsHeaders = new ListSavepointsHeaders();
listSavepointsHeaders.setWorkspace(workspaceId);
ListSavepointsResponse listSavepointsResponse = client.listSavepointsWithOptions(namespace, listSavepointsRequest, listSavepointsHeaders, new RuntimeOptions());
// 删除作业快照。
DeleteSavepointHeaders deleteSavepointHeaders = new DeleteSavepointHeaders();
deleteSavepointHeaders.setWorkspace(workspaceId);
DeleteSavepointResponse deleteSavepointResponse = client.deleteSavepointWithOptions(namespace, savepointId, deleteSavepointHeaders, new RuntimeOptions());
// 异步生成细粒度资源。
GenerateResourcePlanWithFlinkConfAsyncRequest generateResourcePlanWithFlinkConfAsyncRequest = new GenerateResourcePlanWithFlinkConfAsyncRequest();
generateResourcePlanWithFlinkConfAsyncRequest.setBody(Map.of("key", "value"));
GenerateResourcePlanWithFlinkConfAsyncHeaders generateResourcePlanWithFlinkConfAsyncHeaders = new GenerateResourcePlanWithFlinkConfAsyncHeaders();
generateResourcePlanWithFlinkConfAsyncHeaders.setWorkspace(workspaceId);
GenerateResourcePlanWithFlinkConfAsyncResponse generateResourcePlanWithFlinkConfAsyncResponse =
client.generateResourcePlanWithFlinkConfAsyncWithOptions(namespace, deploymentId, generateResourcePlanWithFlinkConfAsyncRequest, generateResourcePlanWithFlinkConfAsyncHeaders, new RuntimeOptions());
String ticketId = generateResourcePlanWithFlinkConfAsyncResponse.getBody().getData().getTicketId();
// 获取异步生成细粒度资源结果。
GetGenerateResourcePlanResultHeaders getGenerateResourcePlanResultHeaders = new GetGenerateResourcePlanResultHeaders();
getGenerateResourcePlanResultHeaders.setWorkspace(workspaceId);
GetGenerateResourcePlanResultResponse getGenerateResourcePlanResultResponse = client.getGenerateResourcePlanResultWithOptions(namespace, ticketId, getGenerateResourcePlanResultHeaders, new RuntimeOptions());
// Flink请求代理。
FlinkApiProxyRequest flinkApiProxyRequest = new FlinkApiProxyRequest();
flinkApiProxyRequest.setNamespace(namespace);
flinkApiProxyRequest.setResourceType("jobs");
flinkApiProxyRequest.setResourceId(jobId);
flinkApiProxyRequest.setFlinkApiPath("/overview");
FlinkApiProxyHeaders flinkApiProxyHeaders = new FlinkApiProxyHeaders();
flinkApiProxyHeaders.setWorkspace(workspaceId);
FlinkApiProxyResponse flinkApiProxyResponse = client.flinkApiProxyWithOptions(flinkApiProxyRequest, flinkApiProxyHeaders, new RuntimeOptions());
}
}