使用Spark Structured Streaming实时处理Kafka数据

本文介绍如何使用阿里云 Databricks 数据洞察创建的集群去访问外部数据源 E-MapReduce,并运行Spark Structured Streaming作业以消费Kafka数据。

前提条件

  • 已注册阿里云账号,详情请参见阿里云账号注册流程

  • 已开通 E-MapReduce服务。

  • 已开通对象存储 OSS服务。

  • 已开通 Databricks数据洞察服务。

步骤一:创建Kafka集群和Databricks 数据洞察集群

  1. 登录阿里云E-MapReduce控制台

  2. 创建Kafka集群。

  3. 登录Databricks数据洞察控制台

  4. 创建集群,详情参见创建集群

步骤二:Databricks 数据洞察集群添加外部数据源

  1. 登录Databricks数据洞察控制台

  2. 单击左侧集群按钮,选择已创建的集群。

  3. 进入集群详情页面,单击上方数据源按钮。

  4. 在数据源页面,单击添加按钮,选择Aliyun EMR KAFKA

  5. 填入描述,选择kafka集群

添加数据源

步骤三:获取JAR包并上传到对象存储 OSS

  1. 登录OSS管理控制台

  2. 创建Bucket存储空间,详情请参见控制台创建存储空间

  3. 针对创建Databricks版本请选择性获取对应的Jar(DBR7+兼容高版本DBR)。

  4. 上传JAR,详情请参见控制台上传文件

步骤四:在Kafka集群上创建Topic

本示例将创建一个分区数为10、副本数为2、名称为test的Topic。

  1. 登录Kafka集群的Master节点,详情请参见登录集群

  2. 通过如下命令创建Topic。

/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log_test --create
说明

创建Topic后,请保留该登录窗口,后续步骤仍将使用。

步骤五:运行Spark Structured Streaming作业

  1. 登录Databricks数据洞察控制台

  2. 新建项目空间,详情请参见项目管理

  3. 在所属项目空间创建作业,详情请参见管理作业

  4. 执行如下作业命令,进行流式单词统计。

--deploy-mode cluster   --class demo.action.AppKafkaSparkStreaming oss://xxx/xxx/SparkStreaming-1.0-SNAPSHOT.jar 
"oss://your bucket/checkpoint_dir" "192.168.xxx.xxx:9092" "log_test" "oss://your bucket/dataOutputPath"

关键参数说明如下:

参数

说明

oss://xxx/xxx/spark-kafka-sample-1.0-SNAPSHOT-jar-with-dependencies.jar

oss对象存储上JAR包的位置

192.168.xxx.xxx:9092

Kafka集群中任一Kafka Broker组件的内网IP地址。IP地址如下图所示。

log_test

Topic名称

oss://your bucket/dataOutputPath

oss对象存储上数据写入目录

查看Kafka集群IP

1602211879422_C927F65E-7358-45DA-AD21-B8C7A06E2A9E

步骤六:使用Kafka发布消息

  1. 在Kafka集群的命令行窗口,执行如下命令运行Kafka的生产者。

#日志结构
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log_test --broker-list emr-worker-1:9092

在命令行中输入4条数据

data

步骤七:查看Spark作业执行情况

通过Yarn UI查看Spark Structured Streaming作业的信息,详情请参见访问Web UI

  1. 在Hadoop控制台,单击作业的ID。

1602209477597_2F740886-8F2F-44F2-AAFB-E6E702922808

详细信息如下。

1602209918260_9A7A860D-E41F-4A45-878D-CA1D1E6ED122

点击Logs,查看详细信息。

步骤八:查看数据写入情况

1. 进入Databricks 数据洞察所在集群的Notebook页面,Notebook使用详情参见(Notebook使用说明

2. 执行代码查询数据的写入情况

%spark
for( i <- 1 to 10 ) {
    Thread.sleep(5000)
    spark.sql("select count(*) from Delta.`oss://your bucket/dataOutputPath`").show()
}

查询数据成功写入4条数据data