使用Spark Structured Streaming完成客户日志数据写入Delta Lake。
本章架构图
步骤一:创建Kafka集群和Databricks 数据洞察集群
1. 登录阿里云E-MapReduce控制台。
2. 创建Kafka集群,详情参见创建集群
3. 登录Databricks数据洞察控制台。
4. 创建集群,详情参见创建集群。
步骤二:Databricks 数据洞察集群添加外部数据源
单击左侧集群按钮,选择已创建的集群。
进入集群详情页面,单击上方数据源按钮。
在数据源页面,单击添加按钮,选择Aliyun EMR KAFKA
填入描述,选择kafka集群。
步骤三:在Kafka集群上创建Topic
本示例将创建一个分区数为1、副本数为2、名称为log-generator-topic。
登录Kafka集群的Master节点,详情请参见登录集群。
通过如下命令创建Topic。
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 1 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log-generator-topic --create
说明
创建Topic后,请保留该登录窗口,后续步骤仍将使用。
步骤四:执行Spark Structured Streaming作业
在Notebook中引入第三方库详情参见Java库管理
%spark.conf
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
2. 创建数据库。
%sql
--创建数据库
create database log_data_warehouse;
--使用创建的库
use log_data_warehouse;
3. 根据生成的日志结果创建分区表。
说明
Apache日志发生器,生成的日志结构。
#日志结构
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
%sql
CREATE TABLE apache_logs(
ipaddr STRING,
identity STRING,
username STRING,
accesstime STRING,
request STRING,
status STRING,
size STRING,
referrer STRING,
agent STRING,
year string,
month string,
day string
)
using delta
PARTITIONED BY(year,month ,day )
4. Notebook执行Spark Structured Streaming 作业
%spark
import org.apache.spark.sql.functions._
//定义执行Structured Streaming的方法
def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {
// 加载Kafka数据配置项startingOffsets=latest;
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
// 定义日志的字段类型
val resDF=streamingInputDF
.select(col("value").cast("string"))
.withColumn("newMessage",split(col("value"), " "))
.filter(col("newMessage").getItem(8).isNotNull)
.select(
col("newMessage").getItem(0).as("ipaddr"),
col("newMessage").getItem(1).as("identity"),
col("newMessage").getItem(2).as("username"),
col("newMessage").getItem(3).as("accesstime"),
col("newMessage").getItem(4).as("request"),
col("newMessage").getItem(5).as("status"),
col("newMessage").getItem(6).as("size"),
col("newMessage").getItem(7).as("referrer"),
col("newMessage").getItem(8).as("agent")
)
.withColumn("year",date_format(current_date(),"yyyy"))
.withColumn("month",date_format(current_date(),"MM"))
.withColumn("day",date_format(current_date(),"dd"))
// 将流数据动态写入到apache_logs表里
val query = resDF
.writeStream
.outputMode("append")
.format("delta")
.option("checkpointLocation", checkpoint_dir)
.table(tableName)
query
}
5. 执行作业
%spark
val my_checkpoint_dir="your checkpiont path"
val tableName="apache_logs"
val servers= "your kafka server"
val topic= "log-generator-topic"
val flowquery =getquery(my_checkpoint_dir,tableName,servers,topic)
步骤五:使用Kafka发布消息
在Kafka集群的命令行窗口,执行如下命令运行Kafka的生产者。
/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log-generator-topic --broker-list emr-worker-1:9092
在命令行中输入数据
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
2.向Kafka集群手动写入三条测试数据
步骤六:查看结果
1. 进入Databricks数据洞察Notebook,动态查看数据写入情况
%spark
for( i <- 1 to 3 ) {
Thread.sleep(5000)
spark.sql("select count(1) from apache_logs").show()
}
查看数据写入情况
查询写入表的count数成功写入三条。
步骤七: 使用Yarn Applications UI页面查看作业详情或kill job
通过Yarn UI查看Spark Structured Streaming作业的信息,详情请参见访问Web UI。
1. 进入running,选择需要kill的application。
文档内容是否对您有帮助?