通过Spark Streaming作业处理Kafka数据

本文介绍在阿里云E-MapReduce创建的包含kafka服务的DataFlow集群中,如何使用Spark Streaming作业从Kafka中实时消费数据。

前提条件

  • 已注册阿里云账号。

  • 已开通E-MapReduce服务。

  • 已完成云账号的授权,详情请参见角色授权

步骤一:创建DataLake和DataFlow集群

创建同一个安全组下的DataLake和DataFlow集群(包含Kafka服务)。创建详情请参见创建集群

  • 创建DataLake集群。

    说明

    本文以Spark 3为例。

    截屏2024-01-10 16

  • 创建DataFlow。

    说明

    务必确认已选择Kafka服务。系统会自动选择其依赖的Kafka-Manager和Zookeeper服务。

    截屏2024-01-10 16

步骤二:获取JAR包并上传到DataLake集群

  1. 获取JAR包(spark-streaming-demo-1.0.jar)。

  2. 上传JAR包至DataLake集群Master节点的/home/emr-user路径下。

步骤三:在DataFlow集群上创建Topic

本示例将创建一个名称为test的Topic。

  1. 登录DataFlow集群的Master节点,详情请参见登录集群

  2. 执行以下命令,创建Topic。

    kafka-topics.sh --create --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --replication-factor 2 --partitions 2 --topic demo
    说明

    创建Topic后,请保留该登录窗口,后续步骤仍将使用。

步骤四:运行Spark Streaming作业

本示例将运行一个流式单词统计(WordCount)的作业。

  1. 登录DataLake集群的Master节点,详情请参见登录集群

  2. 执行以下命令,进入emr-user目录。

    cd /home/emr-user
  3. 执行以下命令,进行流式单词统计(WordCount)。

    spark-submit --class com.aliyun.emr.KafkaApp1 ./spark-streaming-demo-1.0.jar <Kafka broker的内网IP地址>:9092 demogroup1 demo

    关键参数如下表所示。

    参数

    描述

    <Kafka broker的内网IP地址>:9092

    DataFlow集群中Broker节点的内网IP地址和端口号,端口号默认为9092。例如,172.16.**.**:9092,172.16.**.**:9092,172.16.**.**:9092

    您可以在DataFlow集群Kafka服务的状态页签,单击KafkaBroker组件前的image图标,查看所有节点的内网IP地址。

    demogroup1

    指定Kafka消费组的名称。您也可以根据实际情况修改。

    demo

    Topic名称。

步骤五:使用Kafka发布消息

  1. 在DataFlow集群的命令行窗口,执行如下命令运行Kafka的生产者。

    kafka-console-producer.sh --topic demo --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092
  2. 在DataFlow集群的登录窗口中输入文本,在DataLake集群的登录窗口中,会实时显示文本的统计信息。

    例如,在DataFlow集群的登录窗口输入信息。

    image

    DataLake集群的登录窗口会输出如下信息。

    image

步骤六:查看Spark Streaming作业状态

  1. 在EMR on ECS页面,单击DataLake集群的名称。

  2. 单击访问链接与端口页签。

  3. 访问SPARK UI,详情请参见访问链接与端口

  4. History Server页面,单击待查看的App ID

    您可以查看Spark Streaming作业的状态。

    截屏2024-01-10 16