阿里云实时计算Flink支持通过连接器读写OSS以及OSS-HDFS数据。通过配置OSS或者OSS-HDFS连接器的输入属性,实时计算Flink会自动从指定的路径读取数据,并将其作为实时计算Flink的输入流,然后将计算结果按照指定格式写入到OSS或者OSS-HDFS的指定路径。
前提条件
已开通Flink全托管。具体操作,请参见开通实时计算Flink版。
开通Flink全托管后,Flink全托管页签将在5~10分钟内显示已创建完成的工作空间。
已创建SQL作业。
创建SQL作业时,Flink计算引擎需选择VVR 8.0.1及以上版本。具体操作,请参见新建作业。
使用限制
仅支持读写相同账号下的OSS或者OSS-HDFS服务的数据。
对于写入OSS的场景,暂不支持写Avro、CSV、JSON和Raw此类行存的格式,具体原因请参见FLINK-30635。
操作步骤
进入SQL作业创建页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击SQL开发。
在SQL作业编辑区域,编写DDL和DML代码。
将srcbucket中的dir路径下的源表数据写入destbucket的test路径下的结果表。
说明如果您希望通过以下代码读取OSS-HDFS的数据,请确保srcbucket以及destbucket已开通OSS-HDFS服务。
CREATE TEMPORARY TABLE source_table ( `file.name` STRING NOT NULL, `file.path` STRING NOT NULL METADATA ) WITH ( 'connector'='filesystem', 'path'='oss://srcbucket/dir/', 'format'='parquet' ); CREATE TEMPORARY TABLE target_table( `name` STRING, `path` STRING ) with ( 'connector'='filesystem', 'path'='oss://destbucket/test/', 'format'='parquet' ); INSERT INTO target_table SELECT * FROM source_table ;
关于源表支持的元数据列(例如file.path、file.name等)以及WITH参数的具体用法,请参见对象存储OSS连接器。
单击保存。
单击深度检查。
深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。
单击部署。
完成作业开发和深度检查后,即可部署作业,将数据发布至生产环境。
(可选)仅当您需要读取OSS-HDFS服务的数据时,执行此步骤。
单击作业,在部署详情页签下的运行参数配置区域,按以下说明配置OSS-HDFS服务访问密钥以及Endpoint等信息,然后单击保存。
fs.oss.jindo.buckets: srcbucket;destbucket fs.oss.jindo.accessKeyId: LTAI******** fs.oss.jindo.accessKeySecret: KZo1******** fs.oss.jindo.endpoint: cn-hangzhou.oss-dls.aliyuncs.com
各配置项说明如下:
配置项
说明
fs.oss.jindo.buckets
填写待读取源表数据所在的Bucket名称以及待写入结果表数据所在的Bucket名称。Bucket名称之间以分号分隔,例如
srcbucket;destbucket
。fs.oss.jindo.accessKeyId
阿里云账号或者RAM用户的AccessKey ID。获取方法请参见查看RAM用户的AccessKey信息。
fs.oss.jindo.accessKeySecret
阿里云账号或者RAM用户的AccessKey Secret。获取方法请参见查看RAM用户的AccessKey信息。
fs.oss.jindo.endpoint
OSS-HDFS服务的Endpoint,例如cn-hangzhou.oss-dls.aliyuncs.com。
在作业运维页面,单击启动,等待作业进入运行中状态。
通过指定的OSS或者OSS-HDFS结果表存储路径path查看写入的数据。
写入OSS时,您可以通过OSS控制台文件列表下的OSS页签查看写入的数据。写入OSS-HDFS时,您可以通过OSS控制台文件列表下的HDFS页签查看写入的数据。