开源Flink不支持流式写入OSS-HDFS服务,也不支持以EXACTLY_ONCE语义写入存储介质。当您希望开源Flink以EXACTLY_ONCE语义流式写入OSS-HDFS服务,需要结合JindoSDK。
如果您不希望通过Flink流式写入OSS-HDFS服务前部署JindoSDK,您可以选择阿里云实时计算Flink完成OSS-HDFS服务读写需求。更多信息,请参见实时计算Flink读写OSS或者OSS-HDFS。
前提条件
已创建ECS实例。具体步骤,请参见选购ECS实例。
已开通并授权访问OSS-HDFS服务。具体操作,请参见开通并授权访问OSS-HDFS服务。
已自行下载并安装开源版本Flink,且版本不低于1.10.1。Flink 1.16.0及更高版本的可用性尚未得到验证。关于Apache Flink的安装包及版本说明,请参见Apache Flink。
配置JindoSDK
登录已创建的ECS实例。具体操作,请参见连接ECS实例。
下载并解压最新版本JindoSDK JAR包。下载地址,请参见GitHub。。
将JindoSDK解压缩后的plugins/flink/目录下的jindo-flink-${version}-full.jar文件移动至Flink所在根目录下的lib文件夹。
mv plugins/flink/jindo-flink-${version}-full.jar lib/
如果存在Apache Flink自带的Flink OSS Connector,需将其移除,即从Flink的
lib
目录或者plugins/oss-fs-hadoop
路径下移除flink-oss-fs-hadoop-${flink-version}.jar
。JindoSDK配置完成后,无需额外配置即支持以常规Flink流式作业的方法进行使用。写入OSS-HDFS服务以及OSS服务须使用相同的前缀
oss://
,JindoSDK会自动识别写入的内容。
示例
通用配置
为了支持EXACTLY_ONCE语义写入OSS-HDFS,您需要执行如下配置:
打开Flink的检查点(Checkpoint)。
示例如下。
通过如下方式建立的StreamExecutionEnvironment。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
执行如下命令,启动Checkpoint。
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
使用可以重发的数据源,例如Kafka。
便捷使用
您无需额外引入依赖,只需携带oss://前缀的路径,并使用OSS-HDFS服务的Bucket及Endpoint,即可启用Flink。
添加Sink。
以将DataStream<String>的对象OutputStream写入OSS-HDFS为例。
String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
重要在OSS-HDFS服务的Bucket中带有
.<oss-hdfs-endpoint>
的字段为可选项。如果您希望省略该字段,请确保已在Flink或Hadoop组件中正确配置了OSS-HDFS服务的Endpoint。使用
env.execute()
执行Flink作业。
(可选)自定义配置
您在提交Flink作业时,可以自定义参数,以开启或控制特定功能。
例如,通过-yD
配置以yarn-cluster模式提交Flink作业时,示例如下:
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
您可以开启熵注入(Entropy Injection)功能。熵注入可以匹配写入路径的一段特定字符串,用一段随机的字符串进行替换,以削弱所谓片区效应,提高写入效率。
当写入场景为OSS-HDFS时,需要完成下列配置。
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
写入新文件时,路径中与<user-defined-key>
相同的字符串会被替换为一个随机字符串,随机串的长度为<user-defined-length>
,且<user-defined-length>
必须大于零。