本文介绍如何使用阿里云 Databricks 数据洞察创建的集群去访问外部数据源 E-MapReduce,并运行Spark Structured Streaming作业以消费Kafka数据。
前提条件
已注册阿里云账号,详情请参见阿里云账号注册流程。
已开通 E-MapReduce服务。
已开通对象存储 OSS服务。
已开通 Databricks数据洞察服务。
步骤一:创建Kafka集群和Databricks 数据洞察集群
创建Kafka集群。
创建集群,详情参见创建集群。
步骤二:Databricks 数据洞察集群添加外部数据源
单击左侧集群按钮,选择已创建的集群。
进入集群详情页面,单击上方数据源按钮。
在数据源页面,单击添加按钮,选择Aliyun EMR KAFKA
填入描述,选择kafka集群。
步骤三:获取JAR包并上传到对象存储 OSS
步骤四:在Kafka集群上创建Topic
本示例将创建一个分区数为10、副本数为2、名称为test的Topic。
登录Kafka集群的Master节点,详情请参见登录集群。
通过如下命令创建Topic。
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log_test --create
创建Topic后,请保留该登录窗口,后续步骤仍将使用。
步骤五:运行Spark Structured Streaming作业
--deploy-mode cluster --class demo.action.AppKafkaSparkStreaming oss://xxx/xxx/SparkStreaming-1.0-SNAPSHOT.jar
"oss://your bucket/checkpoint_dir" "192.168.xxx.xxx:9092" "log_test" "oss://your bucket/dataOutputPath"
关键参数说明如下:
参数 | 说明 |
oss://xxx/xxx/spark-kafka-sample-1.0-SNAPSHOT-jar-with-dependencies.jar | oss对象存储上JAR包的位置 |
192.168.xxx.xxx:9092 | Kafka集群中任一Kafka Broker组件的内网IP地址。IP地址如下图所示。 |
log_test | Topic名称 |
oss://your bucket/dataOutputPath | oss对象存储上数据写入目录 |
查看Kafka集群IP
步骤六:使用Kafka发布消息
在Kafka集群的命令行窗口,执行如下命令运行Kafka的生产者。
#日志结构
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log_test --broker-list emr-worker-1:9092
在命令行中输入4条数据
步骤七:查看Spark作业执行情况
通过Yarn UI查看Spark Structured Streaming作业的信息,详情请参见访问Web UI。
在Hadoop控制台,单击作业的ID。
详细信息如下。
点击Logs,查看详细信息。
步骤八:查看数据写入情况
1. 进入Databricks 数据洞察所在集群的Notebook页面,Notebook使用详情参见(Notebook使用说明)
2. 执行代码查询数据的写入情况
%spark
for( i <- 1 to 10 ) {
Thread.sleep(5000)
spark.sql("select count(*) from Delta.`oss://your bucket/dataOutputPath`").show()
}
查询数据成功写入4条数据