本文为您介绍如何使用对象存储OSS连接器。
阿里云对象存储OSS(Object Storage Service)是一款海量、安全、低成本和高可靠的云存储服务,可提供99.9999999999%(12个9)的数据持久性,99.995%的数据可用性。多种存储类型供选择,全面优化存储成本。
| 类别 | 详情 | 
| 支持类型 | 源表和结果表 | 
| 运行模式 | 批模式和流模式 | 
| 数据格式 | Orc、Parquet、Avro、Csv、JSON和Raw | 
| 特有监控指标 | 暂无 | 
| API种类 | Datastream和SQL | 
| 是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 | 
使用限制
- 通用 - 仅VVR 11及以上版本支持读取OSS压缩文件(GZIP,BZIP2,XZ,DEFLATE),VVR 8无法正确处理压缩文件。 
- Flink计算引擎VVR 8.0.6以下版本仅支持读取或写入相同账号下的OSS。如需读写其他账号下的OSS,请使用Flink计算引擎VVR 8.0.6及以上版本且配置Bucket鉴权信息,详情请参见配置Bucket鉴权信息。 
 
- 结果表独有 - 对于写入OSS,目前暂不支持写Avro、CSV、JSON和Raw此类行存的格式,具体原因请参见FLINK-30635。 
 
语法结构
CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);元信息列
您可以在源表中读取信息列,以获取读取OSS数据对应的元信息。例如,如果在OSS源表中定义了元信息列file.path,则该列的值为该行数据所在的文件路径。元信息列的使用示例如下。
CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)下表列出了OSS源表所支持的元信息列:
| Key | 数据类型 | 说明 | 
| file.path | STRING NOT NULL | 该行数据所在的文件路径。 | 
| file.name | STRING NOT NULL | 该行数据所在的文件名,即距离文件根路径最远的元素。 | 
| file.size | BIGINT NOT NULL | 该行数据所在的文件的字节数。 | 
| file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | 该行数据所在的文件的修改时间。 | 
WITH参数
- 通用 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - connector - 表类型。 - String - 是 - 无 - 固定值为 - filesystem。- path - 文件系统路径。 - String - 是 - 无 - URI格式,例如 - oss://my_bucket/my_path。说明- VVR 8.0.6及以上版本配置该参数后,您还需要配置Bucket鉴权信息,才能正常读写指定文件系统路径下的数据,配置方法请参见配置Bucket鉴权信息。 - format - 文件格式。 - String - 是 - 无 - 参数取值如下: - csv 
- json 
- avro 
- parquet 
- orc 
- raw 
 
- 源表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - source.monitor-interval - 设置新文件的监控时间间隔,并且必须设置>0的值。 - Duration - 否 - 无 - 如果未设置此配置项,则提供的路径仅会被扫描一次,因此源将是有界的。 - 每个文件都由其路径唯一标识,一旦发现新文件,就会处理一次。 - 已处理的文件在source的整个生命周期内存储在state中。因此,source的state在checkpoint和savepoint时进行保存。更短的时间间隔文件会被更快地发现,但也会更频繁地遍历文件系统或对象存储。 
- 结果表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - partition.default-name - 在分区字段值为NULL或空字符串时,分区的名称。 - String - 否 - _DEFAULT_PARTITION__ - 无。 - sink.rolling-policy.file-size - 滚动前,文件大小的最大值。 - MemorySize - 否 - 128 MB - 写入目录下的数据被分割到part文件中。每个分区对应sink的收到数据的subtask都至少会为该分区生成一个part文件。根据可配置的滚动策略,当前in-progress part文件将被关闭,生成新的part文件。该策略基于大小和指定的文件可被打开的最大timeout时长,来滚动part文件。 说明- 对于列存格式来说, - 即使文件不满足设置的滚动策略,但是在做checkpoint时,总是会滚动文件。 - 所以只要文件满足了设置的滚动策略或者做了checkpoint,文件总是会被滚动。 - 而对于行存格式来说,只有在满足rolling policy配置的情况下才会滚动文件。 - sink.rolling-policy.rollover-interval - 滚动前,part文件处于打开状态的最大时长。 - Duration - 否 - 30min - 检查频率是由sink.rolling-policy.check-interval属性控制。 - sink.rolling-policy.check-interval - 基于时间滚动策略的检查间隔。 - Duration - 否 - 1min - 该属性控制了基于sink.rolling-policy.rollover-interval属性的检查文件是否该被滚动了。 - auto-compaction - 在流式结果表中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该检查点产生的临时文件会被合并,临时文件在合并前不可见。 - Boolean - 否 - false - 如果启用文件合并功能,会根据目标文件大小,将多个小文件合并成大文件。在生产环境中使用文件合并功能时,需要注意: - 只有checkpoint内部的文件才会被合并,会至少生成与checkpoint个数相同的文件个数。 
- 合并前文件不可见,文件的可见时间是 - checkpoint间隔时长+合并时长。
- 合并时间过长,将导致反压,延长checkpoint所需时间。 
 - compaction.file-size - 合并目标文件大小。 - MemorySize - 否 - 128 MB - 默认值与滚动文件大小sink.rolling-policy.file-size相同。 - sink.partition-commit.trigger - 分区提交触发器类型。 - String - 否 - process-time - 对于写分区表,Flink提供了两种类型分区提交触发器,类型如下两种: - process-time:分区提交触发器基于分区创建时间和当前系统时间,既不需要分区时间提取器,也不需要watermark生成器。一旦当前系统时间超过了分区创建系统时间和sink.partition-commit.delay之和,立即提交分区。这种触发器更具通用性,但不是很精确。例如,数据延迟或故障将导致过早提交分区。 
- partition-time:基于提取的分区时间,需要watermark生成。这需要Job支持watermark生成,分区是根据时间来切割的,例如,按小时或按天分区。一旦watermark超过了分区创建系统时间和sink.partition-commit.delay之和立即提交分区。 
 - sink.partition-commit.delay - 分区被提交的最大延迟时间。表明该延迟时间之前分区不会被提交。 - Duration - 否 - 0s - 如果是按天分区,可以设置为 - 1 d。
- 如果是按小时分区,应设置为 - 1 h。
 - sink.partition-commit.watermark-time-zone - 解析Long类型的watermark到TIMESTAMP类型时所采用的时区,解析得到的watermark的TIMESTAMP会跟分区时间进行比较,以判断该分区是否需要被提交。 - String - 否 - UTC - 仅当sink.partition-commit.trigger被设置为partition-time时有效。 - 如果设置得不正确,例如,在TIMESTAMP_LTZ类型的列上定义了source rowtime,如果没有设置该属性,那么用户可能会在若干个小时后才看到分区的提交。默认值为UTC,意味着watermark是定义在TIMESTAMP类型的列上或者没有定义watermark。 
- 如果watermark定义在TIMESTAMP_LTZ类型的列上,watermark时区必须是会话时区。该属性的可选值要么是完整的时区名(例如'America/Los_Angeles'),要么是自定义时区(例如'GMT-08:00')。 
 - partition.time-extractor.kind - 从分区字段中提取时间的时间提取器。 - String - 否 - default - 参数取值如下: - default(默认):默认情况下,可以配置timestamp pattern或formatter。 
- custom:应指定提取器类。 
 - partition.time-extractor.class - 实现PartitionTimeExtractor接口的提取器类。 - String - 否 - 无 - 无。 - partition.time-extractor.timestamp-pattern - 允许用户使用分区字段来获取合法的timestamp pattern的默认construction方式。 - String - 否 - 无 - 默认支持第一个字段按 - yyyy-MM-dd hh:mm:ss这种模式提取。- 如果需要从一个分区字段'dt'提取timestamp,可以配置:$dt。 
- 如果需要从多个分区字段,比如year、month和day和hour提取timestamp,可以配置成: - $year-$month-$day $hour:00:00。
- 如果需要从两个分区字段dt和hour提取timestamp,可以配置成: - $dt $hour:00:00。
 - partition.time-extractor.timestamp-formatter - 转换分区timestamp字符串值为timestamp的formatter,分区timestamp字符串值通过partition.time-extractor.timestamp-pattern属性表达。 - String - 否 - yyyy-MM-dd HH:mm:ss - 例如,分区timestamp提取来自多个分区字段,比如year、month和day,可以配置partition.time-extractor.timestamp-pattern属性为 - $year$month$day,并且配置partition.time-extractor.timestamp-formatter属性为yyyyMMdd。默认的formatter是- yyyy-MM-dd HH:mm:ss。这里的timestamp-formatter和Java的DateTimeFormatter是通用的。- sink.partition-commit.policy.kind - 分区提交策略类型。 - String - 否 - 无 - 分区提交策略通知下游某个分区,该分区已经写入完毕可以被读取。参数取值如下: - success-file:在目录中增加_success文件。 
- custom:通过指定的类来创建提交策略。支持同时指定多个提交策略。 
 - sink.partition-commit.policy.class - 实现PartitionCommitPolicy接口的分区提交策略类。 - String - 否 - 无 - 该类只有在custom提交策略下才能使用。 - sink.partition-commit.success-file.name - 使用success-file分区提交策略时的文件名。 - String - 否 - _SUCCESS - 无。 - sink.parallelism - 将文件写入外部文件系统的parallelism。 - Integer - 否 - 无 - 默认情况下,该sink parallelism与上游chained operator的parallelism一样。当配置了跟上游的chained operator不一样的parallelism时,写文件的算子会使用指定的sink parallelism,如果开启了文件合并,文件合并的算子也会使用指定的sink parallelism。 说明- 这个值应该大于0,否则将抛出异常。 
配置Bucket鉴权信息
仅实时计算引擎VVR 8.0.6及以上版本支持配置Bucket鉴权信息。
指定文件系统路径后,您还需要配置Bucket鉴权信息,才能正常读写您指定文件系统路径下的数据。配置Bucket鉴权信息需要在实时计算开发控制台部署详情页签运行参数配置区域的其他配置中添加如下代码。
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx其中涉及到的参数解释如下表所示:
| 配置项 | 说明 | 
| fs.oss.bucket.<bucketName>.accessKeyId | 参数配置说明如下: 
 | 
| fs.oss.bucket.<bucketName>.accessKeySecret | 
写OSS-HDFS
首先需要在实时计算开发控制台部署详情页签运行参数配置区域的其他配置中添加下如下配置:
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx其中涉及到的参数解释如下表所示:
| 配置项 | 说明 | 
| fs.oss.jindo.buckets | 写入OSS-HDFS服务中的Bucket名称,可配置多个,以分号分隔。当Flink写一个OSS路径时,如果其对应的bucket包含在fs.oss.jindo.buckets中,则会写入OSS-HDFS服务中。 | 
| fs.oss.jindo.accessKeyId | 阿里云账号的Access Key。获取方法请参见查看RAM用户的AccessKey信息。 | 
| fs.oss.jindo.accessKeySecret | 阿里云账号的AccessKey Secret。获取方法请参见查看RAM用户的AccessKey信息。 | 
此外,还需要配置OSS-HDFS的EndPoint。目前支持两种方式来配置OSS-HDFS的EndPoint:
参数配置
在实时计算开发控制台部署详情页签运行参数配置区域的其他配置中添加如下配置:
fs.oss.jindo.endpoint: xxx路径配置
在OSS的路径中配置OSS-HDFS的EndPoint 。
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>- user-defined-oss-hdfs-bucket:bucket的名字
- oss-hdfs-endpoint为OSS-HDFS的EndPoint
- 此时配置项 - fs.oss.jindo.buckets需要包含<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>。
例如,假设bucket名字为jindo-test,endpoint为cn-beijing.oss-dls.aliyuncs.com。则OSS路径需为oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>,配置项fs.oss.jindo.buckets需包含jindo-test.cn-beijing.oss-dls.aliyuncs.com。
# OSS路径 
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# 其他配置 
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com写入外部HDFS文件时,即文件路径为hdfs://**,如果需要指定或切换用户名进行访问,还可以进行如下配置。
在实时计算开发控制台部署详情页签运行参数配置区域的其他配置中添加下如下配置:
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs使用示例
- 源表 - CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
- 结果表 - 写分区表 - CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- 以毫秒为单位的时间 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假设用户配置的时区为 'Asia/Shanghai' 'sink.partition-commit.policy.kind'='success-file' ); -- 流式 sql,插入文件系统表 INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;
- 写非分区表 - CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
 
DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
使用 DataStream API写OSS和OSS-HDFS的代码示例如下:
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();
outputStream.addSink(sink);如果您需要写OSS-HDFS,还需要在实时计算开发控制台部署详情页签运行参数配置区域的其他配置中配置与OSS-HDFS相关的参数,具体请参见写OSS-HDFS。
相关文档
- Flink支持的连接器,请参见支持的连接器。 
- 表格存储Tablestore(OTS)连接器使用方法,请参见表格存储Tablestore(OTS)。 
- 流式数据湖仓Paimon连接器使用方法,请参见流式数据湖仓Paimon。