Databricks Delta Lake数据入湖最佳实践

使用Spark Structured Streaming完成客户日志数据写入Delta Lake。

本章架构图

data

步骤一:创建Kafka集群和Databricks 数据洞察集群

1. 登录阿里云E-MapReduce控制台

2. 创建Kafka集群,详情参见创建集群

3. 登录Databricks数据洞察控制台

4. 创建集群,详情参见创建集群

步骤二:Databricks 数据洞察集群添加外部数据源

  1. 登录Databricks数据洞察控制台

  2. 单击左侧集群按钮,选择已创建的集群。

  3. 进入集群详情页面,单击上方数据源按钮。

  4. 在数据源页面,单击添加按钮,选择Aliyun EMR KAFKA

  5. 填入描述,选择kafka集群

步骤三:在Kafka集群上创建Topic

本示例将创建一个分区数为1、副本数为2、名称为log-generator-topic。

  1. 登录Kafka集群的Master节点,详情请参见使用SSH连接主节点

  2. 通过如下命令创建Topic。

/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 1 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log-generator-topic --create

说明

创建Topic后,请保留该登录窗口,后续步骤仍将使用。

步骤四:执行Spark Structured Streaming作业

示例Note下载:Case-Steaming2DeltaLake

日志文件下载:access.log

  1. 在Notebook中引入第三方库详情参见Java库管理

%spark.conf
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

2. 创建数据库。

%sql
--创建数据库
create database log_data_warehouse;
--使用创建的库
use log_data_warehouse;

3. 根据生成的日志结果创建分区表。

说明

Apache日志发生器,生成的日志结构。

#日志结构
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
%sql
CREATE TABLE apache_logs(
ipaddr STRING,
identity STRING,
username STRING,
accesstime STRING,
request STRING,
status STRING,
size STRING,
referrer STRING,
agent STRING,
year string,
month string,
day string
)
using delta
PARTITIONED BY(year,month ,day )

4. Notebook执行Spark Structured Streaming 作业

%spark
import org.apache.spark.sql.functions._
//定义执行Structured Streaming的方法
def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {
// 加载Kafka数据配置项startingOffsets=latest;
var streamingInputDF =  
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", servers)
    .option("subscribe", topic)     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "10")  
    .option("failOnDataLoss", "true")
    .load()
// 定义日志的字段类型
val resDF=streamingInputDF
    .select(col("value").cast("string"))
    .withColumn("newMessage",split(col("value"), " "))
    .filter(col("newMessage").getItem(8).isNotNull)
    .select(
        col("newMessage").getItem(0).as("ipaddr"),
        col("newMessage").getItem(1).as("identity"),
        col("newMessage").getItem(2).as("username"),
        col("newMessage").getItem(3).as("accesstime"),
        col("newMessage").getItem(4).as("request"),
        col("newMessage").getItem(5).as("status"),
        col("newMessage").getItem(6).as("size"),
        col("newMessage").getItem(7).as("referrer"),
        col("newMessage").getItem(8).as("agent")
    )
    .withColumn("year",date_format(current_date(),"yyyy"))
    .withColumn("month",date_format(current_date(),"MM"))
    .withColumn("day",date_format(current_date(),"dd"))
// 将流数据动态写入到apache_logs表里   
val query = resDF
      .writeStream
      .outputMode("append")
      .format("delta")
      .option("checkpointLocation", checkpoint_dir)
      .table(tableName)
  query
}

5. 执行作业

%spark
val my_checkpoint_dir="your checkpiont path"
val tableName="apache_logs"
val servers= "your kafka server"
val topic= "log-generator-topic"
val flowquery =getquery(my_checkpoint_dir,tableName,servers,topic)

步骤五:使用Kafka发布消息

  1. 在Kafka集群的命令行窗口,执行如下命令运行Kafka的生产者。

/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log-generator-topic --broker-list emr-worker-1:9092

在命令行中输入数据

40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"

2.向Kafka集群手动写入三条测试数据

打他

步骤六:查看结果

1. 进入Databricks数据洞察Notebook,动态查看数据写入情况

%spark
for( i <- 1 to 3 ) {
    Thread.sleep(5000)
    spark.sql("select count(1) from apache_logs").show()
}

查看数据写入情况

查询写入表的count数成功写入三条。data

步骤七: 使用Yarn Applications UI页面查看作业详情或kill job

通过Yarn UI查看Spark Structured Streaming作业的信息,详情请参见集群 Web UI

1. 进入running,选择需要kill的application。

data