本文为您介绍如何使用Flink计算表格存储(Tablestore)的数据,表格存储中的数据表或时序表均可作为实时计算Flink的源表或结果表进行使用。
前提条件
已开通表格存储服务并创建实例。具体操作,请参见开通服务并创建实例。
已开通Flink工作空间。具体操作,请参见开通实时计算Flink版。
重要实时计算Flink必须与表格存储服务位于同一地域。实时计算Flink支持的地域,请参见地域列表。
已获取AccessKey信息。
重要出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。具体操作,请参见创建RAM用户并授权。
实时计算作业开发流程
步骤一:创建作业
步骤二:编写SQL作业
此处以将数据表中的数据同步至另一个数据表为例,为您介绍如何编写SQL作业。更多SQL示例,请参考SQL示例。
分别创建源表(数据表)和结果表(数据表)的临时表。
详细配置信息,请参见附录1:Tablestore连接器。
-- 创建源表(数据表)的临时表 tablestore_stream CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector' = 'ots', -- 源表的连接器类型。固定取值为ots。 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。 'instanceName' = 'xxx', -- 表格存储的实例名称。 'tableName' = 'flink_source_table', -- 表格存储的源表名称。 'tunnelName' = 'flink_source_tunnel', -- 表格存储源表的数据通道名称。 'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。 'ignoreDelete' = 'false' -- 是否忽略DELETE操作类型的实时数据:不忽略。 ); -- 创建结果表(数据表)的临时表 tablestore_sink CREATE TEMPORARY TABLE tablestore_sink( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED -- 主键。 ) WITH ( 'connector' = 'ots', -- 结果表的连接器类型。固定取值为ots。 'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。 'instanceName' = 'xxx', -- 表格存储的实例名称。 'tableName' = 'flink_sink_table', -- 表格存储的结果表名称。 'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。 'valueColumns'='customerid,customername' --插入字段的列名。 );
编写作业逻辑。
将源表数据插入到结果表的代码示例如下:
--将源表数据插入到结果表 INSERT INTO tablestore_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
步骤三:(可选)查看配置信息
在SQL编辑区域右侧页签,您可以查看或上传相关配置。
页签名称 | 配置说明 |
更多配置 |
|
代码结构 |
|
版本信息 | 您可以在此处查看作业版本信息,操作列下的功能详情请参见管理作业版本。 |
步骤四:(可选)进行深度检查
深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。
在SQL编辑区域右上方,单击深度检查。
在深度检查对话框,单击确认。
步骤五:(可选)进行作业调试
您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。
在SQL编辑区域右上方,单击调试。
在调试对话框,选择调试集群后,单击下一步。
如果没有可用集群则需要创建新的Session集群,Session集群与SQL作业引擎版本需要保持一致并处于运行中。详情请参见创建Session集群。
配置调试数据。
如果您使用线上数据,无需处理。
如果您需要使用调试数据,需要先单击下载调试数据模板,填写调试数据后,上传调试数据。详情请参见作业调试。
确定好调试数据后,单击确定。
步骤六:进行作业部署
在SQL编辑区域右上方,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确定。
Session集群适用于非生产环境的开发测试环境,通过部署或调试作业提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将生产作业提交至Session集群中,可能会导致业务稳定性问题。
步骤七:启动并查看Flink计算结果
在左侧导航栏,单击
。单击目标作业操作列中的启动。
选择无状态启动后,单击启动。当作业状态转变为运行中时,代表作业运行正常。作业启动参数配置,详情请参见作业启动。
说明Flink中的每个TaskManager建议配置2CPU和4GB内存,此配置可以充分发挥每个TaskManager的计算能力。单个TaskManager能达到1万/s的写入速率。
在source表分区数目足够多的情况下,建议Flink中并发配置在16以内,写入速率随并发线性增长。
在作业运维详情页面,查看Flink计算结果。
在
页面,单击目标作业名称。在作业日志页签,单击运行Task Managers页签下Path,ID列的目标任务。
单击日志,在页面查看相关的日志信息。
(可选)停止作业。
如果您对作业进行了修改(例如更改代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要重新部署作业,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,或者更新非动态生效的参数配置时,也需要停止后再启动作业。作业停止详情请参见作业停止。