使用Flink将Kafka数据流式写入阿里云OSS

将Kafka数据实时导入到OSS等湖存储中来降低存储成本或者进行查询分析是常见的使用场景。在EMR-3.37.1及之后的版本中,DataFlow集群内置了JindoFS相关的依赖,使得您可以在DataFlow集群中运行Flink作业,将Kafka数据以Exactly-Once语义流式写入阿里云OSS。本文通过示例为您介绍如何在DataFlow集群中编写并运行Flink作业来满足上述场景。

背景信息

关于JindoFS的部分高级配置(例如,熵注入),请参见支持Flink可恢复性写入JindoFS或OSS

前提条件

  • 已开通E-MapReduce服务和OSS服务。
  • 已完成云账号的授权,详情请参见角色授权

操作流程

  1. 步骤一:准备环境

  2. 步骤二:准备JAR包

  3. 步骤三:创建Kafka Topic并生成数据

  4. 步骤四:运行Flink作业

  5. 步骤五:查看输出的结果

步骤一:准备环境

  1. 创建包含Flink和Kafka组件的DataFlow集群,详情请参见创建集群

    说明

    本文以EMR-3.43.1版本为例。

  2. 在OSS上创建与DataFlow集群相同地域的Bucket,详情请参见控制台创建存储空间

步骤二:准备JAR包

  1. 下载Demo代码

    基于JindoFS,您可以在Flink作业中,如同HDFS一样将数据以流式的方式写入OSS中(路径需要以oss://为前缀)。本示例中使用了Flink的StreamingFileSink方法来演示开启了检查点(Checkpoint)之后,Flink如何以Exactly-Once语义写入OSS。

    下述代码片段演示了如何构建Kafka Source与OSS Sink,完整代码您可以从GitHub链接中下载获得。

    重要

    JindoFS支持免密读写相同阿里云账号下的OSS存储,因此作业中无需声明相关AccessKey信息。

    public class OssDemoJob {
    
        public static void main(String[] args) throws Exception {
            ...
    
            // Check output oss dir
            Preconditions.checkArgument(
                    params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
                    "outputOssDir should start with 'oss://'.");
    
            // Set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Checkpoint is required
            env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
    
            String outputPath = params.get(OUTPUT_OSS_DIR);
    
            // Build Kafka source with new Source API based on FLIP-27
            KafkaSource<Event> kafkaSource =
                    KafkaSource.<Event>builder()
                            .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                            .setTopics(params.get(INPUT_TOPIC_ARG))
                            .setStartingOffsets(OffsetsInitializer.latest())
                            .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                            .setDeserializer(new EventDeSerializationSchema())
                            .build();
            // DataStream Source
            DataStreamSource<Event> source =
                    env.fromSource(
                            kafkaSource,
                            WatermarkStrategy.<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner((event, ts) -> event.getEventTime()),
                            "Kafka Source");
    
            StreamingFileSink<Event> sink =
                    StreamingFileSink.forRowFormat(
                                    new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
                            .build();
            source.addSink(sink);
    
            // Compile and submit the job
            env.execute();
        }
    }
    说明

    本示例代码片段给出了主要的示例程序,您可以根据自身环境进行修改(例如,添加包名以及修改代码中的Checkpoint间隔)后,进行编译。关于如何构建Flink作业的JAR包,可以参见Flink官方文档。如果无需任何修改,您可以直接使用 dataflow-oss-demo-1.0-SNAPSHOT.jar 包进行操作。

  2. 在命令行中,进入到下载的项目文件的根目录下,执行以下命令打包文件。

    mvn clean package

    根据您pom.xml文件中artifactId的信息,项目对应目录dataflow-demo/dataflow-oss-demo/target下会出现dataflow-oss-demo-1.0-SNAPSHOT.jar包。

步骤三:创建Kafka Topic并生成数据

  1. 通过SSH方式连接DataFlow集群,详情请参见登录集群

  2. 执行以下命令,创建测试所需的Topic。

    kafka-topics.sh --create  --bootstrap-server core-1-1:9092 \
        --replication-factor 2  \
        --partitions 3  \
        --topic kafka-test-topic

    创建成功后,命令行会打印如下信息。

    Created topic kafka-test-topic.
  3. 写入数据至Kafka Topic。

    1. 在命令行中执行以下命令,进入Kafka Producer Console。

      kafka-console-producer.sh --broker-list core-1-1:9092 --topic  kafka-test-topic
    2. 输入五条测试数据。

      1,Ken,0,1,1662022777000
      1,Ken,0,2,1662022777000
      1,Ken,0,3,1662022777000
      1,Ken,0,4,1662022777000
      1,Ken,0,5,1662022777000
    3. 按下Ctrl+C退出Kafka Producer Console。

步骤四:运行Flink作业

  1. 通过SSH方式连接DataFlow集群,详情请参见登录集群

  2. 上传打包好的dataflow-oss-demo-1.0-SNAPSHOT.jar至DataFlow集群的根目录下。

    说明

    本文示例中dataflow-oss-demo-1.0-SNAPSHOT.jar是上传至root根目录下,您也可以自定义上传路径。

  3. 执行以下命令,提交作业。

    本示例通过Per-Job Cluster模式提交作业,其他方式请参见基础使用

    flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \
        /dataflow-oss-demo-1.0-SNAPSHOT.jar  \
        --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \
        --kafkaBrokers core-1-1:9092 \
        --inputTopic kafka-test-topic \
        --inputTopicGroup my-group

    参数说明:

    • outputOssDir:指定您计划写入的OSS目录。

    • kafkaBrokers:指定Kafka集群的broker,使用core-1-1:9092即可。

    • inputTopic:指定计划读取的Kafka Topic,使用在步骤三中创建的kafka-test-topic

    • inputTopicGroup:指定计划使用的Kafka Consumer Group,使用my-group用于测试即可。

    result

    您可以执行以下命令,查看作业状态。

    flink list -t yarn-per-job -Dyarn.application.id=<appId>
    说明

    <appId>为作业运行后返回的Application ID。例如,本示例截图中的application_1670236019397_0003。

步骤五:查看输出的结果

  • 作业正常运行后,您可以在OSS控制台查看输出结果。

    1. 登录OSS管理控制台

    2. 单击创建的存储空间。

    3. 在文件管理页面指定的输出目录下查看输出结果,输出结果如下图所示。OSS results

      重要

      由于该作业为流式作业会持续运行,产生较多输出文件,应在完成验证后,及时在命令行中通过yarn application -kill <appId>命令终止该作业。

  • 您也可以在DataFlow集群中,通过命令行运行hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0来展示实际存储到OSS中的数据,如下图所示。OSS示例

    重要
    • 为了保证Exactly-Once语义,在Flink作业每完成一次Checkpoint(本示例中Checkpoint间隔为30s),数据文件才会落盘到OSS中。

    • 此外,由于该作业为流式作业会持续运行,会产生较多输出文件,应在完成验证后,及时在命令行中通过yarn application -kill <appId>命令终止该作业。