您可以使用阿里云开发者工具套件(Alibaba Cloud SDK for Java),不用复杂编程即可访问Flink全托管服务。本文为您介绍如何安装并使用阿里云开发者工具套件访问Flink全托管服务。

注意事项

2022年9月19日,我们对Flink全托管产品版本进行了升级,后续将默认使用新的SDK版本为您服务。本次版本在全网更新发布的时间为2022年9月19日至2022年10月27日。
说明
  • 关于SDK升级的影响,请参见产品公告
  • 本文档为新版SDK使用文档,如果您还需查看旧的SDK使用文档,请参见OpenAPI SDK

前提条件

  • RAM用户已创建AccessKey。
    说明 为避免主账号泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户Flink全托管相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:
  • 已安装Java环境。Alibaba Cloud SDK for Java要求使用JDK 8或更高版本。
  • RAM用户需要在VVP平台上已完成RAM用户授权和作业操作账号授权。详情请参见RAM用户授权作业操作账号授权

使用限制

仅支持以下几个地域:
  • 杭州
  • 上海
  • 深圳
  • 北京
  • 新加坡
  • 张家口
说明 暂不支持其他地域。

安装Alibaba Cloud SDK for Java

  1. 通过以下任一方式在IDEA中配置Maven项目管理工具。
    • 使用IDEA中集成的Maven项目管理工具。
    • 访问Maven官方下载页面(Download Apache Maven)下载对应操作系统的Maven工具,手动配置Maven工具。
  2. 通过以下任一方式创建Maven项目。
    • 方式一:在IDEA中添加一个Maven项目。maven1
    • 方式二:将已有的项目转换为Maven项目。
      1. 右键单击要转换的项目,并选择Add Framework Support...maven2
      2. 选择Maven,并单击OKmaven3
  3. 在项目目录下的pom.xml文件中,添加SDK依赖。
      <dependencies>
          <dependency>
              <groupId>com.aliyun</groupId>
              <artifactId>ververica20220718</artifactId>
              <version>1.0.1</version>
          </dependency>
    
          <dependency>
              <groupId>com.aliyun</groupId>
              <artifactId>tea-openapi</artifactId>
              <version>0.2.6</version>
          </dependency>
    
          <dependency>
              <groupId>com.aliyun</groupId>
              <artifactId>tea</artifactId>
              <version>1.2.1</version>
          </dependency>
      </dependencies>

    添加依赖后,Maven项目管理工具会自动下载相关JAR包,您可以在项目目录下的External Libraries查看已成功导入的依赖。

请求步骤

  1. 配置服务所在的Endpoint、用户的AccessKeyId和AccessKeySecret,来生成Client对象。
      /*
        目前支持以下地域,对应的Endpoint如下。
        杭州Endpoint: ververica.cn-hangzhou.aliyuncs.com
        上海Endpoint: ververica.cn-shanghai.aliyuncs.com
        深圳Endpoint: ververica.cn-shenzhen.aliyuncs.com
        北京Endpoint: ververica.cn-beijing.aliyuncs.com
        新加坡Endpoint: ververica.ap-southeast-1.aliyuncs.com
        张家口Endpoint: ververica.cn-zhangjiakou.aliyuncs.com
      */
    
    String endpoint = "<endpoint>";
    String accessKey = "<AccessKey>";
    String secret = "<Secret>";
    Config config = new com.aliyun.teaopenapi.models.Config();
    config.setAccessKeyId(accessKey);
    config.setAccessKeySecret(secret);
    config.setEndpoint(endpoint);
    Client client = new Client(config);
  2. 初始化请求类,使用setter设置请求参数。以ListDeployments请求为例。
    ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
    listDeploymentsRequest.setPageIndex(1); // 分页参数,页码必须大于0。
    listDeploymentsRequest.setPageSize(100); // 分页参数,每页元素数量大于0,不超过100。
    
    // 您可以在Flink开发控制台,工作空间详情中获取工作空间ID(workspaceId)。
    // 请不要在workspaceId中直接写入工作空间名称。
    String workspaceId = "<workspaceId>";
    ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
    listDeploymentsHeaders.setWorkspace(workspaceId);
  3. 调用并返回结果。
    String namespace = "<namespace>";
    ListDeploymentsResponse result =
        client.listDeploymentsWithOptions(
            namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());
    
    // 判断请求结果是否异常,如果异常,可以从errorCode及errorMessage中获取异常信息。
    if (!result.getBody().getSuccess()) {
      throw new RuntimeException(
          String.format("Failed to list deployments, errorCode: %s, errorMessage: %s.",
              result.getBody().errorCode, result.getBody().errorMessage));
    }
    
    // 获取正常结果。
    List<Deployment> deployments = result.getBody().getData();

参考示例

String endpoint = "ververica.cn-hangzhou.aliyuncs.com";

Config config = new com.aliyun.teaopenapi.models.Config();
config.setAccessKeyId("<AccessKey>");
config.setAccessKeySecret("<Secret>");
config.setEndpoint(endpoint);
Client client = new Client(config);

ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
listDeploymentsRequest.setPageIndex(1);
listDeploymentsRequest.setPageSize(100);

ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
listDeploymentsHeaders.setWorkspace("workspaceId");

ListDeploymentsResponse result =
    client.listDeploymentsWithOptions(
        "namespace", listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());

if (!result.getBody().getSuccess()) {
  throw new RuntimeException(
      String.format("Failed to list deployments, errorCode: %s, errorMessage: %s.",
          result.getBody().errorCode, result.getBody().errorMessage));
}

List<Deployments> deployments = result.getBody().getData();

全部SDK DEMO

package com.ververica.sdk.examples;

import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.aliyun.ververica20220718.Client;
import com.aliyun.ververica20220718.models.*;
import java.util.Map;

public class Examples {

  public static void main(String[] args) throws Exception {

    /*
      目前支持以下地域,对应的Endpoint如下。
      杭州Endpoint: ververica.cn-hangzhou.aliyuncs.com
      上海Endpoint: ververica.cn-shanghai.aliyuncs.com
      深圳Endpoint: ververica.cn-shenzhen.aliyuncs.com
      北京Endpoint: ververica.cn-beijing.aliyuncs.com
      新加坡Endpoint: ververica.ap-southeast-1.aliyuncs.com
      张家口Endpoint: ververica.cn-zhangjiakou.aliyuncs.com
    */

    String endpoint = "ververica.cn-hangzhou.aliyuncs.com";
    String accessKey = "<AccessKey>";
    String secret = "<Secret>";
    String workspaceId = "<workspaceId>";
    String namespace = "<namespace>";

    Config config = new com.aliyun.teaopenapi.models.Config();
    config.setAccessKeyId(accessKey);
    config.setAccessKeySecret(secret);
    config.setEndpoint(endpoint);
    Client client = new Client(config);

    // 创建变量。
    String variableName = "name";
    String variableValue = "value";
    Variable variable = new Variable();
    variable.setKind("Plain"); // only Plain is supported now
    variable.setDescription("this is a sdk variable example.");
    variable.setName(variableName);
    variable.setValue(variableValue);
    CreateVariableRequest createVariableRequest = new CreateVariableRequest();
    createVariableRequest.setBody(variable);
    CreateVariableHeaders createVariableHeaders = new CreateVariableHeaders();
    createVariableHeaders.setWorkspace(workspaceId);
    CreateVariableResponse createVariableResponse =
        client.createVariableWithOptions(namespace, createVariableRequest, createVariableHeaders, new RuntimeOptions());

    // 获取变量列表。
    ListVariablesRequest listVariablesRequest = new ListVariablesRequest();
    listVariablesRequest.setPageIndex(1);
    listVariablesRequest.setPageSize(100);
    ListVariablesHeaders listVariablesHeaders = new ListVariablesHeaders();
    listVariablesHeaders.setWorkspace(workspaceId);
    ListVariablesResponse listVariablesResponse =
        client.listVariablesWithOptions(namespace, listVariablesRequest, listVariablesHeaders, new RuntimeOptions());

    // 删除变量。
    DeleteVariableHeaders deleteVariableHeaders = new DeleteVariableHeaders();
    deleteVariableHeaders.setWorkspace(workspaceId);
    DeleteVariableResponse result =
        client.deleteVariableWithOptions(namespace, variableName, deleteVariableHeaders, new RuntimeOptions());

    // 获取引擎版本列表。
    ListEngineVersionMetadataHeaders listEngineVersionMetadataHeaders = new ListEngineVersionMetadataHeaders();
    listEngineVersionMetadataHeaders.setWorkspace(workspaceId);
    ListEngineVersionMetadataResponse listEngineVersionMetadataResponse =
        client.listEngineVersionMetadataWithOptions(listEngineVersionMetadataHeaders, new RuntimeOptions());

    // 获取部署目标列表。
    ListDeploymentTargetsRequest listDeploymentTargetsRequest = new ListDeploymentTargetsRequest();
    listDeploymentTargetsRequest.setPageSize(100);
    listDeploymentTargetsRequest.setPageIndex(1);
    ListDeploymentTargetsHeaders listDeploymentTargetsHeaders = new ListDeploymentTargetsHeaders();
    listDeploymentTargetsHeaders.setWorkspace(workspaceId);
    ListDeploymentTargetsResponse listDeploymentTargetsResponse =
        client.listDeploymentTargetsWithOptions(namespace, listDeploymentTargetsRequest, listDeploymentTargetsHeaders, new RuntimeOptions());
    String deploymentTargetName = listDeploymentTargetsResponse.getBody().getData().get(0).getName();

    // 创建已部署作业。
    SqlArtifact sqlArtifact = new SqlArtifact();
    sqlArtifact.setSqlScript(
        " create temporary table `datagen` ( id varchar, name varchar ) with ( 'connector' = 'datagen' );  create temporary table `blackhole` ( id varchar, name varchar ) with ( 'connector' = 'blackhole' );  insert into blackhole select * from datagen;");
    Artifact artifact = new Artifact();
    artifact.setKind("SQLSCRIPT");
    artifact.setSqlArtifact(sqlArtifact);

    BriefDeploymentTarget briefDeploymentTarget = new BriefDeploymentTarget();
    briefDeploymentTarget.setName(deploymentTargetName);
    briefDeploymentTarget.setMode("PER_JOB");

    Deployment deployment = new Deployment();
    deployment.setName("deploymentName");
    deployment.setExecutionMode("Batch");  // STREAMING || BATCH
    deployment.setDescription("this is a example deployment");
    deployment.setEngineVersion(userExpectedEngineVersion);
    deployment.setArtifact(artifact);
    deployment.setDeploymentTarget(briefDeploymentTarget);
    CreateDeploymentRequest createDeploymentRequest = new CreateDeploymentRequest();
    createDeploymentRequest.setBody(deployment);
    CreateDeploymentHeaders createDeploymentHeaders = new CreateDeploymentHeaders();
    createDeploymentHeaders.setWorkspace(workspaceId);
    CreateDeploymentResponse createDeploymentResponse =
        client.createDeploymentWithOptions(namespace, createDeploymentRequest, createDeploymentHeaders, new RuntimeOptions());

    String deploymentId = createDeploymentResponse.getBody().getData().getDeploymentId();

    // 获取已部署作业。
    GetDeploymentHeaders getDeploymentHeaders = new GetDeploymentHeaders();
    getDeploymentHeaders.setWorkspace(workspaceId);
    GetDeploymentResponse getDeploymentResponse =
        client.getDeploymentWithOptions(namespace, deploymentId, getDeploymentHeaders, new RuntimeOptions());

    // 获取已部署作业列表。
    ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
    listDeploymentsRequest.setPageIndex(1);
    listDeploymentsRequest.setPageSize(100);
    ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
    listDeploymentsHeaders.setWorkspace(workspaceId);
    ListDeploymentsResponse listDeploymentsResponse =
        client.listDeploymentsWithOptions(namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());

    // 更新已部署作业。
    Deployment updateParams = new Deployment();
    String description = "this is a ververica sdk test for updating deployment";
    updateParams.setDescription(description);
    UpdateDeploymentRequest updateDeploymentRequest = new UpdateDeploymentRequest();
    updateDeploymentRequest.setBody(deployment);
    UpdateDeploymentHeaders updateDeploymentHeaders = new UpdateDeploymentHeaders();
    updateDeploymentHeaders.setWorkspace(workspaceId);
    UpdateDeploymentResponse updateDeploymentResponse =
        client.updateDeploymentWithOptions(namespace, deploymentId, updateDeploymentRequest, updateDeploymentHeaders, new RuntimeOptions());

    // 删除已部署作业。
    DeleteDeploymentHeaders deleteDeploymentHeaders = new DeleteDeploymentHeaders();
    deleteDeploymentHeaders.setWorkspace(workspaceId);
    DeleteDeploymentResponse deleteDeploymentResponse =
        client.deleteDeploymentWithOptions(
            namespace, deploymentId, deleteDeploymentHeaders, new RuntimeOptions());

    // 启动作业实例。
    double cpu = 1.0;
    String memory = "4Gi";
    DeploymentRestoreStrategy restoreStrategy = new DeploymentRestoreStrategy();
    restoreStrategy.setKind("NONE");
    BasicResourceSettingSpec basicResourceSettingSpec = new BasicResourceSettingSpec();
    basicResourceSettingSpec.setCpu(cpu);
    basicResourceSettingSpec.setMemory(memory);
    // basic resource setting
    BasicResourceSetting basicResourceSetting = new BasicResourceSetting();
    basicResourceSetting.setParallelism(1L);
    basicResourceSetting.setJobmanagerResourceSettingSpec(basicResourceSettingSpec);
    basicResourceSetting.setTaskmanagerResourceSettingSpec(basicResourceSettingSpec);
    // streaming resource setting
    StreamingResourceSetting streamingResourceSetting = new StreamingResourceSetting();
    streamingResourceSetting.setResourceSettingMode("BASIC");
    streamingResourceSetting.setBasicResourceSetting(basicResourceSetting);
    // brief resource setting
    BriefResourceSetting briefResourceSetting = new BriefResourceSetting();
    briefResourceSetting.setStreamingResourceSetting(streamingResourceSetting);
    briefResourceSetting.setFlinkConf(Map.of("key", "value"));

    StartJobRequestBody startJobRequestBody = new StartJobRequestBody();
    startJobRequestBody.setDeploymentId(deploymentId);
    startJobRequestBody.setRestoreStrategy(restoreStrategy);
    startJobRequestBody.setResourceSettingSpec(briefResourceSetting);
    StartJobRequest startJobRequest = new StartJobRequest();
    startJobRequest.setBody(startJobRequestBody);
    StartJobHeaders startJobHeaders = new StartJobHeaders();
    startJobHeaders.setWorkspace(workspaceId);
    StartJobResponse startJobResponse =
        client.startJobWithOptions(
            namespace, startJobRequest, startJobHeaders, new RuntimeOptions());

    String jobId = startJobResponse.getBody().getData().getJobId();

    // 获取作业实例。
    GetJobHeaders getJobHeaders = new GetJobHeaders();
    getJobHeaders.setWorkspace(workspaceId);
    GetJobResponse getJobResponse =
        client.getJobWithOptions(namespace, jobId, getJobHeaders, new RuntimeOptions());

    // 获取作业实例列表。
    ListJobsRequest listJobsRequest = new ListJobsRequest();
    listJobsRequest.setDeploymentId(deploymentId);
    listJobsRequest.setPageIndex(1);
    listJobsRequest.setPageSize(100);
    ListJobsHeaders listJobsHeaders = new ListJobsHeaders();
    listJobsHeaders.setWorkspace(workspaceId);
    ListJobsResponse listJobsResponse =
        client.listJobsWithOptions(namespace, listJobsRequest, listJobsHeaders, new RuntimeOptions());

    // 停止作业实例。
    StopJobRequestBody stopJobRequestBody = new StopJobRequestBody();
    stopJobRequestBody.setStopStrategy("NONE");
    StopJobRequest stopJobRequest = new StopJobRequest();
    stopJobRequest.setBody(stopJobRequestBody);
    StopJobHeaders stopJobHeaders = new StopJobHeaders();
    stopJobHeaders.setWorkspace(workspaceId);
    StopJobResponse stopJobResponse =
        client.stopJobWithOptions(namespace, jobId, stopJobRequest, stopJobHeaders, new RuntimeOptions());

    // 删除作业实例。
    DeleteJobHeaders deleteJobHeaders = new DeleteJobHeaders();
    deleteJobHeaders.setWorkspace(workspaceId);
    DeleteJobResponse deleteJobResponse = client.deleteJobWithOptions(namespace, jobId, deleteJobHeaders, new RuntimeOptions());

    // 创建作业快照。
    CreateSavepointRequest createSavepointRequest = new CreateSavepointRequest();
    createSavepointRequest.setDeploymentId(deploymentId);
    createSavepointRequest.setNativeFormat(false);
    createSavepointRequest.setDescription("savepoint demo");
    CreateSavepointHeaders createSavepointHeaders = new CreateSavepointHeaders();
    createSavepointHeaders.setWorkspace(workspaceId);
    CreateSavepointResponse createSavepointResponse = client.createSavepointWithOptions(namespace, createSavepointRequest, createSavepointHeaders, new RuntimeOptions());

    String savepointId = createSavepointResponse.getBody().getData().getSavepointId();

    // 获取作业快照。
    GetSavepointHeaders getSavepointHeaders = new GetSavepointHeaders();
    getSavepointHeaders.setWorkspace(workspaceId);
    GetSavepointResponse getSavepointResponse = client.getSavepointWithOptions(namespace, savepointId, getSavepointHeaders, new RuntimeOptions());

    // 获取作业快照列表。
    ListSavepointsRequest listSavepointsRequest = new ListSavepointsRequest();
    listSavepointsRequest.setPageIndex(1);
    listSavepointsRequest.setPageSize(100);
    ListSavepointsHeaders listSavepointsHeaders = new ListSavepointsHeaders();
    listSavepointsHeaders.setWorkspace(workspaceId);
    ListSavepointsResponse listSavepointsResponse = client.listSavepointsWithOptions(namespace, listSavepointsRequest, listSavepointsHeaders, new RuntimeOptions());

    // 删除作业快照。
    DeleteSavepointHeaders deleteSavepointHeaders = new DeleteSavepointHeaders();
    deleteSavepointHeaders.setWorkspace(workspaceId);
    DeleteSavepointResponse deleteSavepointResponse = client.deleteSavepointWithOptions(namespace, savepointId, deleteSavepointHeaders, new RuntimeOptions());

    // 异步生成细粒度资源。
    GenerateResourcePlanWithFlinkConfAsyncRequest generateResourcePlanWithFlinkConfAsyncRequest = new GenerateResourcePlanWithFlinkConfAsyncRequest();
    generateResourcePlanWithFlinkConfAsyncRequest.setBody(Map.of("key", "value"));
    GenerateResourcePlanWithFlinkConfAsyncHeaders generateResourcePlanWithFlinkConfAsyncHeaders = new GenerateResourcePlanWithFlinkConfAsyncHeaders();
    generateResourcePlanWithFlinkConfAsyncHeaders.setWorkspace(workspaceId);
    GenerateResourcePlanWithFlinkConfAsyncResponse generateResourcePlanWithFlinkConfAsyncResponse =
        client.generateResourcePlanWithFlinkConfAsyncWithOptions(namespace, deploymentId, generateResourcePlanWithFlinkConfAsyncRequest, generateResourcePlanWithFlinkConfAsyncHeaders, new RuntimeOptions());

    String ticketId = generateResourcePlanWithFlinkConfAsyncResponse.getBody().getData().getTicketId();

    // 获取异步生成细粒度资源结果。
    GetGenerateResourcePlanResultHeaders getGenerateResourcePlanResultHeaders = new GetGenerateResourcePlanResultHeaders();
    getGenerateResourcePlanResultHeaders.setWorkspace(workspaceId);
    GetGenerateResourcePlanResultResponse getGenerateResourcePlanResultResponse = client.getGenerateResourcePlanResultWithOptions(namespace, ticketId, getGenerateResourcePlanResultHeaders, new RuntimeOptions());

    // Flink请求代理。
    FlinkApiProxyRequest flinkApiProxyRequest = new FlinkApiProxyRequest();
    flinkApiProxyRequest.setNamespace(namespace);
    flinkApiProxyRequest.setResourceType("jobs");
    flinkApiProxyRequest.setResourceId(jobId);
    flinkApiProxyRequest.setFlinkApiPath("/overview");
    FlinkApiProxyHeaders flinkApiProxyHeaders = new FlinkApiProxyHeaders();
    flinkApiProxyHeaders.setWorkspace(workspaceId);
    FlinkApiProxyResponse flinkApiProxyResponse = client.flinkApiProxyWithOptions(flinkApiProxyRequest, flinkApiProxyHeaders, new RuntimeOptions());
  }

}