SDK快速入门
您可以使用阿里云开发者工具套件(Alibaba Cloud SDK for Java),不用复杂编程即可访问Flink全托管服务。本文为您介绍如何安装并使用阿里云开发者工具套件访问Flink全托管服务。
注意事项
2022年9月19日,我们对Flink全托管产品版本进行了升级,后续将默认使用新的SDK版本为您服务。本次版本在全网更新发布的时间为2022年9月19日至2022年10月27日。
关于SDK升级的影响,请参见产品公告。
本文档为新版SDK使用文档,如果您还需查看旧版SDK使用文档,请单击OpenAPI SDK(已停止维护)下载后进行查看。
前提条件
RAM用户已创建AccessKey。
说明为避免主账号泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户Flink全托管相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:
创建RAM用户操以及对应AccessKey,请参见创建RAM用户或创建AccessKey。
为RAM用户授权,请参见RAM授权。
已安装Java环境。Alibaba Cloud SDK for Java要求使用JDK 8或更高版本。
安装Alibaba Cloud SDK for Java
通过以下任一方式在IDEA中配置Maven项目管理工具。
使用IDEA中集成的Maven项目管理工具。
访问Maven官方下载页面(Download Apache Maven)下载对应操作系统的Maven工具,手动配置Maven工具。
通过以下任一方式创建Maven项目。
方式一:在IDEA中添加一个Maven项目。
方式二:将已有的项目转换为Maven项目。
右键单击要转换的项目,并选择Add Framework Support...。
选择Maven,并单击OK。
在项目目录下的pom.xml文件中,添加SDK依赖。
<dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>ververica20220718</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-openapi</artifactId> <version>0.2.6</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.2.1</version> </dependency> </dependencies>
添加依赖后,Maven项目管理工具会自动下载相关JAR包,您可以在项目目录下的External Libraries查看已成功导入的依赖。
服务接入点(Endpoint)
地域 | 公网 | VPC |
中国(北京) | ververica.cn-beijing.aliyuncs.com | ververica-vpc.cn-beijing.aliyuncs.com |
中国(上海) | ververica.cn-shanghai.aliyuncs.com | ververica-vpc.cn-shanghai.aliyuncs.com |
中国(杭州) | ververica.cn-hangzhou.aliyuncs.com | ververica-vpc.cn-hangzhou.aliyuncs.com |
中国(深圳) | ververica.cn-shenzhen.aliyuncs.com | ververica-vpc.cn-shenzhen.aliyuncs.com |
中国(张家口) | ververica.cn-zhangjiakou.aliyuncs.com | ververica-vpc.cn-zhangjiakou.aliyuncs.com |
中国(香港) | ververica.cn-hongkong.aliyuncs.com | ververica-vpc.cn-hongkong.aliyuncs.com |
新加坡 | ververica.ap-southeast-1.aliyuncs.com | ververica-vpc.ap-southeast-1.aliyuncs.com |
德国(法兰克福) | ververica.eu-central-1.aliyuncs.com | ververica-vpc.eu-central-1.aliyuncs.com |
英国 | ververica.eu-west-1.aliyuncs.com | ververica-vpc.eu-west-1.aliyuncs.com |
印度尼西亚(雅加达) | ververica.ap-southeast-5.aliyuncs.com | ververica-vpc.ap-southeast-5.aliyuncs.com |
马来西亚 | ververica.ap-southeast-3.aliyuncs.com | ververica-vpc.ap-southeast-3.aliyuncs.com |
美国(硅谷) | ververica.us-west-1.aliyuncs.com | ververica-vpc.us-west-1.aliyuncs.com |
上海金融云 | ververica.cn-shanghai-finance-1.aliyuncs.com | ververica-vpc.cn-shanghai-finance-1.aliyuncs.com |
请求步骤
配置服务所在的Endpoint、用户的AccessKeyId和AccessKeySecret,来生成Client对象。
运行以下代码示例前,需要先设置AccessKey ID和AccessKeySecret的环境变量。具体请参考身份验证配置。
重要阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
强烈建议不要将AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
/* * 从环境变量中读取访问凭证。 */ String endpoint = "<endpoint>"; // 各地域对应的Endpoint请参见本文的服务接入点(Endpinot)部分。 com.aliyun.credentials.Client credentialClient = new com.aliyun.credentials.Client(); Config config = new com.aliyun.teaopenapi.models.Config(); config.setEndpoint(endpoint); config.setCredential(credentialClient); Client client = new Client(config);
初始化请求类,使用setter设置请求参数。以ListDeployments请求为例。
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest(); listDeploymentsRequest.setPageIndex(1); // 分页参数,页码必须大于0。 listDeploymentsRequest.setPageSize(100); // 分页参数,每页元素数量大于0,不超过100。 // 您可以在Flink开发控制台,工作空间详情中获取工作空间ID(workspaceId)。 // 请不要在workspaceId中直接写入工作空间名称。 String workspaceId = "<workspaceId>"; ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders(); listDeploymentsHeaders.setWorkspace(workspaceId);
调用并返回结果。
String namespace = "<namespace>"; ListDeploymentsResponse result = client.listDeploymentsWithOptions( namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions()); // 判断请求结果是否异常,如果异常,可以从errorCode及errorMessage中获取异常信息。 if (!result.getBody().getSuccess()) { throw new RuntimeException( String.format("Failed to list deployments, errorCode: %s, errorMessage: %s.", result.getBody().errorCode, result.getBody().errorMessage)); } // 获取正常结果。 List<Deployment> deployments = result.getBody().getData();
参考示例
String endpoint = "ververica.cn-hangzhou.aliyuncs.com";
/*
* 运行以下代码前,请先配置环境变量:
* export ALIBABA_CLOUD_ACCESS_KEY_ID=
* export ALIBABA_CLOUD_ACCESS_KEY_SECRET=
*/
com.aliyun.credentials.Client credentialClient = new com.aliyun.credentials.Client();
Config config = new com.aliyun.teaopenapi.models.Config();
config.setEndpoint(endpoint);
config.setCredential(credentialClient);
Client client = new Client(config);
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
listDeploymentsRequest.setPageIndex(1);
listDeploymentsRequest.setPageSize(100);
ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
listDeploymentsHeaders.setWorkspace("workspaceId");
ListDeploymentsResponse result =
client.listDeploymentsWithOptions(
"namespace", listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());
if (!result.getBody().getSuccess()) {
throw new RuntimeException(
String.format("Failed to list deployments, errorCode: %s, errorMessage: %s.",
result.getBody().errorCode, result.getBody().errorMessage));
}
List<Deployment> deployments = result.getBody().getData();
全部SDK DEMO
package com.ververica.sdk.examples;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.aliyun.ververica20220718.Client;
import com.aliyun.ververica20220718.models.*;
import java.util.HashMap;
public class Examples {
public static void main(String[] args) throws Exception {
/*
各地域对应的Endpoint请参见本文的服务接入点(Endpinot)部分。
*/
String endpoint = "<endpoint>";
String workspaceId = "<workspaceId>";
String namespace = "<namespace>";
com.aliyun.credentials.Client credentialClient = new com.aliyun.credentials.Client();
Config config = new com.aliyun.teaopenapi.models.Config();
config.setEndpoint(endpoint);
config.setCredential(credentialClient);
Client client = new Client(config);
// 创建成员。
// memberId为阿里云账号ID。
String memberId = "123456789";
String role = "EDITOR";
Member member = new Member();
member.setMember(memberId);
member.setRole(role);
CreateMemberRequest createMemberRequest = new CreateMemberRequest();
createMemberRequest.setBody(member);
CreateMemberHeaders createMemberHeaders = new CreateMemberHeaders();
createMemberHeaders.setWorkspace(workspaceId);
CreateMemberResponse createMemberResponse =
client.createMemberWithOptions(namespace, createMemberRequest, createMemberHeaders,
new RuntimeOptions());
// 获取成员。
GetMemberHeaders getMemberHeaders = new GetMemberHeaders();
getMemberHeaders.setWorkspace(workspaceId);
GetMemberResponse getMemberResponse =
client.getMemberWithOptions(namespace, memberId, getMemberHeaders, new RuntimeOptions());
// 更新成员。
Member memberWithNewRole = new Member();
member.setMember(memberId);
member.setRole("VIEWER");
UpdateMemberRequest updateMemberRequest = new UpdateMemberRequest();
updateMemberRequest.setBody(memberWithNewRole);
UpdateMemberHeaders updateMemberHeaders = new UpdateMemberHeaders();
updateMemberHeaders.setWorkspace(workspaceId);
UpdateMemberResponse updateMemberResponse =
client.updateMemberWithOptions(namespace, updateMemberRequest, updateMemberHeaders,
new RuntimeOptions());
// 获取成员列表。
ListMembersRequest listMembersRequest = new ListMembersRequest();
listMembersRequest.setPageIndex(1);
listMembersRequest.setPageSize(10);
ListMembersHeaders listMembersHeaders = new ListMembersHeaders();
listMembersHeaders.setWorkspace(workspaceId);
ListMembersResponse listMembersResponse =
client.listMembersWithOptions(namespace, listMembersRequest, listMembersHeaders,
new RuntimeOptions());
// 删除成员。
DeleteMemberHeaders deleteMemberHeaders = new DeleteMemberHeaders();
deleteMemberHeaders.setWorkspace(workspaceId);
DeleteMemberResponse deleteMemberResponse =
client.deleteMemberWithOptions(namespace, memberId, deleteMemberHeaders,
new RuntimeOptions());
// 创建变量。
String variableName = "name";
String variableValue = "value";
Variable variable = new Variable();
// only Plain is supported now
variable.setKind("Plain");
variable.setDescription("this is a sdk variable example.");
variable.setName(variableName);
variable.setValue(variableValue);
CreateVariableRequest createVariableRequest = new CreateVariableRequest();
createVariableRequest.setBody(variable);
CreateVariableHeaders createVariableHeaders = new CreateVariableHeaders();
createVariableHeaders.setWorkspace(workspaceId);
CreateVariableResponse createVariableResponse =
client.createVariableWithOptions(namespace, createVariableRequest, createVariableHeaders, new RuntimeOptions());
// 获取变量列表。
ListVariablesRequest listVariablesRequest = new ListVariablesRequest();
listVariablesRequest.setPageIndex(1);
listVariablesRequest.setPageSize(100);
ListVariablesHeaders listVariablesHeaders = new ListVariablesHeaders();
listVariablesHeaders.setWorkspace(workspaceId);
ListVariablesResponse listVariablesResponse =
client.listVariablesWithOptions(namespace, listVariablesRequest, listVariablesHeaders, new RuntimeOptions());
// 删除变量。
DeleteVariableHeaders deleteVariableHeaders = new DeleteVariableHeaders();
deleteVariableHeaders.setWorkspace(workspaceId);
DeleteVariableResponse result =
client.deleteVariableWithOptions(namespace, variableName, deleteVariableHeaders, new RuntimeOptions());
// 获取引擎版本列表。
ListEngineVersionMetadataHeaders listEngineVersionMetadataHeaders = new ListEngineVersionMetadataHeaders();
listEngineVersionMetadataHeaders.setWorkspace(workspaceId);
ListEngineVersionMetadataResponse listEngineVersionMetadataResponse =
client.listEngineVersionMetadataWithOptions(listEngineVersionMetadataHeaders, new RuntimeOptions());
String defaultEngineVersionName = listEngineVersionMetadataResponse.getBody().getData().getDefaultEngineVersion();
// 获取部署目标列表。
ListDeploymentTargetsRequest listDeploymentTargetsRequest = new ListDeploymentTargetsRequest();
listDeploymentTargetsRequest.setPageSize(100);
listDeploymentTargetsRequest.setPageIndex(1);
ListDeploymentTargetsHeaders listDeploymentTargetsHeaders = new ListDeploymentTargetsHeaders();
listDeploymentTargetsHeaders.setWorkspace(workspaceId);
ListDeploymentTargetsResponse listDeploymentTargetsResponse =
client.listDeploymentTargetsWithOptions(namespace, listDeploymentTargetsRequest, listDeploymentTargetsHeaders, new RuntimeOptions());
String deploymentTargetName = listDeploymentTargetsResponse.getBody().getData().get(0).getName();
// 创建已部署作业。
SqlArtifact sqlArtifact = new SqlArtifact();
sqlArtifact.setSqlScript(
" create temporary table `datagen` ( id varchar, name varchar ) with ( 'connector' = 'datagen' ); create temporary table `blackhole` ( id varchar, name varchar ) with ( 'connector' = 'blackhole' ); insert into blackhole select * from datagen;");
Artifact artifact = new Artifact();
artifact.setKind("SQLSCRIPT");
artifact.setSqlArtifact(sqlArtifact);
BriefDeploymentTarget briefDeploymentTarget = new BriefDeploymentTarget();
briefDeploymentTarget.setName(deploymentTargetName);
briefDeploymentTarget.setMode("PER_JOB");
double cpu = 1.0;
String memory = "4Gi";
DeploymentRestoreStrategy restoreStrategy = new DeploymentRestoreStrategy();
restoreStrategy.setKind("NONE");
BasicResourceSettingSpec basicResourceSettingSpec = new BasicResourceSettingSpec();
basicResourceSettingSpec.setCpu(cpu);
basicResourceSettingSpec.setMemory(memory);
// basic resource setting
BasicResourceSetting basicResourceSetting = new BasicResourceSetting();
basicResourceSetting.setParallelism(1L);
basicResourceSetting.setJobmanagerResourceSettingSpec(basicResourceSettingSpec);
basicResourceSetting.setTaskmanagerResourceSettingSpec(basicResourceSettingSpec);
// streaming resource setting
StreamingResourceSetting streamingResourceSetting = new StreamingResourceSetting();
streamingResourceSetting.setResourceSettingMode("BASIC");
streamingResourceSetting.setBasicResourceSetting(basicResourceSetting);
Deployment deployment = new Deployment();
deployment.setName("deploymentName");
// STREAMING || BATCH
deployment.setExecutionMode("STREAMING");
deployment.setStreamingResourceSetting(streamingResourceSetting);
deployment.setDescription("this is a example deployment");
deployment.setEngineVersion(defaultEngineVersionName);
deployment.setArtifact(artifact);
deployment.setDeploymentTarget(briefDeploymentTarget);
CreateDeploymentRequest createDeploymentRequest = new CreateDeploymentRequest();
createDeploymentRequest.setBody(deployment);
CreateDeploymentHeaders createDeploymentHeaders = new CreateDeploymentHeaders();
createDeploymentHeaders.setWorkspace(workspaceId);
CreateDeploymentResponse createDeploymentResponse =
client.createDeploymentWithOptions(namespace, createDeploymentRequest,
createDeploymentHeaders, new RuntimeOptions());
String deploymentId = createDeploymentResponse.getBody().getData().getDeploymentId();
// 获取已部署作业。
GetDeploymentHeaders getDeploymentHeaders = new GetDeploymentHeaders();
getDeploymentHeaders.setWorkspace(workspaceId);
GetDeploymentResponse getDeploymentResponse =
client.getDeploymentWithOptions(namespace, deploymentId, getDeploymentHeaders, new RuntimeOptions());
// 获取已部署作业列表。
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
listDeploymentsRequest.setPageIndex(1);
listDeploymentsRequest.setPageSize(100);
ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
listDeploymentsHeaders.setWorkspace(workspaceId);
ListDeploymentsResponse listDeploymentsResponse =
client.listDeploymentsWithOptions(namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());
// 更新已部署作业。
Deployment updateParams = new Deployment();
String description = "this is a ververica sdk test for updating deployment";
updateParams.setDescription(description);
UpdateDeploymentRequest updateDeploymentRequest = new UpdateDeploymentRequest();
updateDeploymentRequest.setBody(deployment);
UpdateDeploymentHeaders updateDeploymentHeaders = new UpdateDeploymentHeaders();
updateDeploymentHeaders.setWorkspace(workspaceId);
UpdateDeploymentResponse updateDeploymentResponse =
client.updateDeploymentWithOptions(namespace, deploymentId, updateDeploymentRequest, updateDeploymentHeaders, new RuntimeOptions());
// 删除已部署作业。
DeleteDeploymentHeaders deleteDeploymentHeaders = new DeleteDeploymentHeaders();
deleteDeploymentHeaders.setWorkspace(workspaceId);
DeleteDeploymentResponse deleteDeploymentResponse =
client.deleteDeploymentWithOptions(
namespace, deploymentId, deleteDeploymentHeaders, new RuntimeOptions());
// 启动作业实例。
JobStartParameters jobStartParameters = new JobStartParameters();
jobStartParameters.setRestoreStrategy(restoreStrategy);
jobStartParameters.setDeploymentId(deploymentId);
StartJobWithParamsRequest startJobWithParamsRequest = new StartJobWithParamsRequest();
startJobWithParamsRequest.setBody(jobStartParameters);
StartJobWithParamsHeaders startJobWithParamsHeaders = new StartJobWithParamsHeaders();
startJobWithParamsHeaders.setWorkspace(workspaceId);
StartJobWithParamsResponse startJobResponse =
client.startJobWithParamsWithOptions(
namespace, startJobWithParamsRequest, startJobWithParamsHeaders, new RuntimeOptions());
String jobId = startJobResponse.getBody().getData().getJobId();
// 获取作业实例。
GetJobHeaders getJobHeaders = new GetJobHeaders();
getJobHeaders.setWorkspace(workspaceId);
GetJobResponse getJobResponse =
client.getJobWithOptions(namespace, jobId, getJobHeaders, new RuntimeOptions());
// 获取作业实例列表。
ListJobsRequest listJobsRequest = new ListJobsRequest();
listJobsRequest.setDeploymentId(deploymentId);
listJobsRequest.setPageIndex(1);
listJobsRequest.setPageSize(100);
ListJobsHeaders listJobsHeaders = new ListJobsHeaders();
listJobsHeaders.setWorkspace(workspaceId);
ListJobsResponse listJobsResponse =
client.listJobsWithOptions(namespace, listJobsRequest, listJobsHeaders, new RuntimeOptions());
// 停止作业实例。
StopJobRequestBody stopJobRequestBody = new StopJobRequestBody();
stopJobRequestBody.setStopStrategy("NONE");
StopJobRequest stopJobRequest = new StopJobRequest();
stopJobRequest.setBody(stopJobRequestBody);
StopJobHeaders stopJobHeaders = new StopJobHeaders();
stopJobHeaders.setWorkspace(workspaceId);
StopJobResponse stopJobResponse =
client.stopJobWithOptions(namespace, jobId, stopJobRequest, stopJobHeaders, new RuntimeOptions());
// 删除作业实例。
DeleteJobHeaders deleteJobHeaders = new DeleteJobHeaders();
deleteJobHeaders.setWorkspace(workspaceId);
DeleteJobResponse deleteJobResponse = client.deleteJobWithOptions(namespace, jobId, deleteJobHeaders, new RuntimeOptions());
// 创建作业快照。
CreateSavepointRequest createSavepointRequest = new CreateSavepointRequest();
createSavepointRequest.setDeploymentId(deploymentId);
createSavepointRequest.setNativeFormat(false);
createSavepointRequest.setDescription("savepoint demo");
CreateSavepointHeaders createSavepointHeaders = new CreateSavepointHeaders();
createSavepointHeaders.setWorkspace(workspaceId);
CreateSavepointResponse createSavepointResponse = client.createSavepointWithOptions(namespace, createSavepointRequest, createSavepointHeaders, new RuntimeOptions());
String savepointId = createSavepointResponse.getBody().getData().getSavepointId();
// 获取作业快照。
GetSavepointHeaders getSavepointHeaders = new GetSavepointHeaders();
getSavepointHeaders.setWorkspace(workspaceId);
GetSavepointResponse getSavepointResponse = client.getSavepointWithOptions(namespace, savepointId, getSavepointHeaders, new RuntimeOptions());
// 获取作业快照列表。
ListSavepointsRequest listSavepointsRequest = new ListSavepointsRequest();
listSavepointsRequest.setPageIndex(1);
listSavepointsRequest.setPageSize(100);
ListSavepointsHeaders listSavepointsHeaders = new ListSavepointsHeaders();
listSavepointsHeaders.setWorkspace(workspaceId);
ListSavepointsResponse listSavepointsResponse = client.listSavepointsWithOptions(namespace, listSavepointsRequest, listSavepointsHeaders, new RuntimeOptions());
// 删除作业快照。
DeleteSavepointHeaders deleteSavepointHeaders = new DeleteSavepointHeaders();
deleteSavepointHeaders.setWorkspace(workspaceId);
DeleteSavepointResponse deleteSavepointResponse = client.deleteSavepointWithOptions(namespace, savepointId, deleteSavepointHeaders, new RuntimeOptions());
// 异步生成细粒度资源。
GenerateResourcePlanWithFlinkConfAsyncRequest generateResourcePlanWithFlinkConfAsyncRequest = new GenerateResourcePlanWithFlinkConfAsyncRequest();
HashMap<String, String> resourcePlanFlinkConf = new HashMap<>();
resourcePlanFlinkConf.put("key", "value");
generateResourcePlanWithFlinkConfAsyncRequest.setBody(resourcePlanFlinkConf);
GenerateResourcePlanWithFlinkConfAsyncHeaders generateResourcePlanWithFlinkConfAsyncHeaders = new GenerateResourcePlanWithFlinkConfAsyncHeaders();
generateResourcePlanWithFlinkConfAsyncHeaders.setWorkspace(workspaceId);
GenerateResourcePlanWithFlinkConfAsyncResponse generateResourcePlanWithFlinkConfAsyncResponse =
client.generateResourcePlanWithFlinkConfAsyncWithOptions(namespace, deploymentId, generateResourcePlanWithFlinkConfAsyncRequest, generateResourcePlanWithFlinkConfAsyncHeaders, new RuntimeOptions());
String ticketId = generateResourcePlanWithFlinkConfAsyncResponse.getBody().getData().getTicketId();
// 获取异步生成细粒度资源结果。
GetGenerateResourcePlanResultHeaders getGenerateResourcePlanResultHeaders = new GetGenerateResourcePlanResultHeaders();
getGenerateResourcePlanResultHeaders.setWorkspace(workspaceId);
GetGenerateResourcePlanResultResponse getGenerateResourcePlanResultResponse = client.getGenerateResourcePlanResultWithOptions(namespace, ticketId, getGenerateResourcePlanResultHeaders, new RuntimeOptions());
// Flink请求代理。
FlinkApiProxyRequest flinkApiProxyRequest = new FlinkApiProxyRequest();
flinkApiProxyRequest.setNamespace(namespace);
flinkApiProxyRequest.setResourceType("jobs");
flinkApiProxyRequest.setResourceId(jobId);
flinkApiProxyRequest.setFlinkApiPath("/overview");
FlinkApiProxyHeaders flinkApiProxyHeaders = new FlinkApiProxyHeaders();
flinkApiProxyHeaders.setWorkspace(workspaceId);
FlinkApiProxyResponse flinkApiProxyResponse = client.flinkApiProxyWithOptions(flinkApiProxyRequest, flinkApiProxyHeaders, new RuntimeOptions());
}
}