Notebook - 航空公司数据分析示例
本文针对Notebook的使用,做一个具体的场景示例——航空公司数据导入及分析。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
警告首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
说明DDI支持免密访问OSS路径,结构为:oss://BucketName/Object
BucketName为您的存储空间名称;
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档 val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")
步骤一:创建集群并通过knox账号访问Notebook
创建集群参考:https://help.aliyun.com/document_detail/167621.html,需注意要设置RAM子账号及保存好knox密码,登录WebUI时候需要用到。
步骤二:创建Notebook、导入数据、进行数据分析
示例数据下载:airline_statistic_usa.csv
1. 读取OSS数据、打印schema,创建TempView
Load OSS data
%spark
val sparkDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oss:/
/databricks-demo-hangzhou
/airline_statistic_usa.csv")
Print schema
%spark
sparkDF.printSchema()
sparkDF.show()
Create Temp View
%spark
sparkDF.createOrReplaceTempView("usa_flights")

2. 查询分析:Analysis,Top 10 Average Distance Traveled By Flight Carrier
%sql
SELECT OP_UNIQUE_CARRIER, CAST(AVG(DISTANCE) AS INT) AS AvgDistance
FROM usa_flights
GROUP BY OP_UNIQUE_CARRIER
ORDER BY AvgDistance
DESC
LIMIT 20

3. 定义UDF:Define UDF
%pyspark
from pyspark.sql.types import IntegerType
def isDelayed(x):
if x == None:
return 0
elif int(x) > 10:
return 1
else:
return 0
spark.udf.register("isDelayed", isDelayed, IntegerType())

4. 使用UDF进行查询:Analysis,Top 10 Total Delayed Flights By Carrier
%sql
SELECT OP_UNIQUE_CARRIER, SUM(isDelayed(DEP_DELAY)) AS NumOfDelayed
FROM usa_flights
GROUP BY OP_UNIQUE_CARRIER
ORDER BY NumOfDelayed
DESC
LIMIT 10
