Python SDK参考

本文为您介绍如何安装并使用实时计算Flink版Python SDK。

前提条件

  • 已创建AccessKey,详情请参见创建AccessKey

    说明

    为避免主账号泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户Flink相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:

  • 已准备Python环境,要求3.6及以上版本。

  • 账号具有相关访问及操作权限,详情请参见权限管理

安装 Flink Python SDK

通过pip安装Python SDK。

  • 进行作业开发和运维等操作时,需要调用实时计算开发控制台API。安装和使用详情请参见开发控制台SDK中心

    pip3 install alibabacloud_ververica20220718==1.2.1
  • 查看工作空间信息,进行工作空间的购买、资源调整等操作时,需要调用实时计算管理控制台API。安装和使用详情请参见售卖控制台SDK中心

    pip3 install alibabacloud_foasconsole20211028==1.0.2

在线调试和生成SDK示例

OpenAPI门户提供了在线调用产品API、动态生成SDK示例代码和快速检索接口等功能,可以显著降低使用API的难度。您可以在实时计算开发控制台API实时计算售卖控制台API页面查看所需API的SDK示例,并下载使用,具体操作步骤请参见快速开始

image

参考示例

说明

查看已购买的工作空间

查询目标地域下已购买的Flink工作空间的详细信息。必填请求参数如下,更多参数详情请参见DescribeInstances - 查看已购买Flink全托管工作空间

Region:地域ID,详情请参见服务接入点。例如cn-hangzhou。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> foasconsole20211028Client:
        """
        使用AK&SK初始化账号Client
        @return: Client
        @throws Exception
        """
        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性,建议使用更安全的 STS 方式。以下代码示例仅供参考。
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint请根据实际情况修改
        config.endpoint = f'foasconsole.aliyuncs.com'
        return foasconsole20211028Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 调用并打印API的返回值
            response=client.describe_instances_with_options(describe_instances_request, runtime)
            print(response)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            await client.describe_instances_with_options_async(describe_instances_request, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

获取已部署作业列表

获取项目空间下所有已部署作业的信息。必填请求参数如下,参数详情请参见ListDeployments - 获取已部署作业列表

  • workspace:工作空间ID,可通过查看已购买的工作空间返回的ResourceId获取。例如adf9e5147a****。

  • namespace:项目空间名称,例如script****-default。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        使用AK&SK初始化账号Client
        @return: Client
        @throws Exception
        """
        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性,建议使用更安全的 STS 方式。以下代码示例仅供参考。
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint请根据实际情况修改
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            #调用并打印API的返回值
            request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
            print(request)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值。namespace为项目空间名称
            await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

启动作业

启动项目空间下一个已部署作业。必填请求参数如下,参数详情请参见StartJobWithParams - 启动作业实例

  • workspace:工作空间ID,例如adf9e5147a****。

  • namespace:项目空间名称,例如script****-default。

  • deploymentId:作业部署ID,可通过获取已部署作业列表获取。例如3171d4d1-5952-4d02-b978-e762493b****。

  • kind:启动位点类型。支持NONE(无状态启动)、LATEST_SAVEPOINT(最新的作业快照启动)、FROM_SAVEPOINT(从指定快照启动)、LATEST_STATE(最新状态启动)。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        使用AK&SK初始化账号Client
        @return: Client
        @throws Exception
        """
        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性,建议使用更安全的 STS 方式。以下代码示例仅供参考。
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint请根据实际情况修改
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            #作业启动策略
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

获取作业实例列表

获取某个已部署作业下所有作业实例的信息。必填请求参数如下,参数详情请参见ListJobs - 获取作业实例列表

  • workspace:工作空间ID,例如adf9e5147a****。

  • namespace:项目空间名称,例如script****-default。

  • deploymentId:作业部署ID,可以通过获取已部署作业列表获取。例如3171d4d1-5952-4d02-b978-e762493b****。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        使用AK&SK初始化账号Client
        @return: Client
        @throws Exception
        """
        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性,建议使用更安全的 STS 方式。以下代码示例仅供参考。
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint请根据实际情况修改
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 调用并打印API的返回值
            request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
            print(request)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

停止作业实例

停止一个作业实例。必填请求参数如下,参数详情请参见StopJob - 停止作业实例

  • workspace:工作空间ID,例如adf9e5147a****。

  • namespace:项目空间名称,例如script****-default。

  • jobId:作业实例ID,您可以通过获取作业实例列表获取。例如3171d4d1-5952-4d02-b978-e762493b****。

  • stopStrategy:作业停止策略。支持NONE(直接停止)、STOP_WITH_SAVEPOINT(生成作业快照后停止)、STOP_WITH_DRAIN(以drain的方式停止)。

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        使用AK&SK初始化账号Client
        @return: Client
        @throws Exception
        """
        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性,建议使用更安全的 STS 方式。以下代码示例仅供参考。
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint请根据实际情况修改
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            # 作业停止策略
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

相关文档

  • Java SDK参考请参见Java SDK参考

  • 实时计算售卖控制台支持的API,以及各API的参数说明等详情请参见API概览

  • 实时计算开发控制台支持的API,以及各API的参数说明等详情请参见API概览