本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文针对Notebook的使用,做一个具体的场景示例——航空公司数据导入及分析。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
警告首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
说明DDI支持免密访问OSS路径,结构为:oss://BucketName/Object
BucketName为您的存储空间名称;
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档 val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")
步骤一:创建集群并通过knox账号访问Notebook
创建集群参考:创建集群,需注意要设置RAM子账号及保存好knox密码,登录WebUI时候需要用到。
步骤二:创建Notebook、导入数据、进行数据分析
1. 读取OSS数据、打印schema,创建TempView
Load OSS data
%spark
val sparkDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oss:/
/databricks-demo-hangzhou
/airline_statistic_usa.csv")
Print schema
%spark
sparkDF.printSchema()
sparkDF.show()
Create Temp View
%spark
sparkDF.createOrReplaceTempView("usa_flights")
2. 查询分析:Analysis,Top 10 Average Distance Traveled By Flight Carrier
%sql
SELECT OP_UNIQUE_CARRIER, CAST(AVG(DISTANCE) AS INT) AS AvgDistance
FROM usa_flights
GROUP BY OP_UNIQUE_CARRIER
ORDER BY AvgDistance
DESC
LIMIT 20
3. 定义UDF:Define UDF
%pyspark
from pyspark.sql.types import IntegerType
def isDelayed(x):
if x == None:
return 0
elif int(x) > 10:
return 1
else:
return 0
spark.udf.register("isDelayed", isDelayed, IntegerType())
4. 使用UDF进行查询:Analysis,Top 10 Total Delayed Flights By Carrier
%sql
SELECT OP_UNIQUE_CARRIER, SUM(isDelayed(DEP_DELAY)) AS NumOfDelayed
FROM usa_flights
GROUP BY OP_UNIQUE_CARRIER
ORDER BY NumOfDelayed
DESC
LIMIT 10
文档内容是否对您有帮助?