全部产品

使用DDI进行Airline Flight数据分析

本文针对Notebook的使用,做一个具体的场景示例——航空公司数据导入及分析。

前提条件

  1. 通过主账号登录阿里云 Databricks控制台

  2. 已创建集群,具体请参见创建集群

  3. 已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间

    警告

    首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。

    说明

    DDI支持免密访问OSS路径,结构为:oss://BucketName/Object

    • BucketName为您的存储空间名称;

    • Object为上传到OSS上的文件的访问路径。

    例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件

    // 从oss地址读取文本文档
    val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")

步骤一:创建集群并通过knox账号访问Notebook

创建集群参考: https://help.aliyun.com/document_detail/167621.html,需注意要设置ram子账号及保存好knox密码,登录WebUI时候需要用到。

步骤二:创建Notebook、导入数据、进行数据分析

示例Note下载:CASE2-Airline_Flight_Data.zpln

示例数据下载:airline_statistic_usa.csv

1. 读取OSS数据、打印schema,创建TempView

Load OSS data

%spark
val sparkDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oss:/ 
/databricks-demo-hangzhou 
/airline_statistic_usa.csv")

Print schema

%spark
sparkDF.printSchema()
sparkDF.show()

Create Temp View

%spark
sparkDF.createOrReplaceTempView("usa_flights")
airline读取数据导入等

2. 查询分析:Analysis,Top 10 Average Distance Traveled By Flight Carrier

%sql
SELECT OP_UNIQUE_CARRIER, CAST(AVG(DISTANCE) AS INT) AS AvgDistance
FROM usa_flights
GROUP BY OP_UNIQUE_CARRIER
ORDER BY AvgDistance
DESC
LIMIT 20
分析前10数据

3. 定义UDF:Define UDF

%pyspark
from pyspark.sql.types import IntegerType
def isDelayed(x):
    if x == None:
        return 0
    elif int(x) > 10:
        return 1
    else:
        return 0
spark.udf.register("isDelayed", isDelayed, IntegerType())
Case2-03

4. 使用UDF进行查询:Analysis,Top 10 Total Delayed Flights By Carrier

%sql
SELECT OP_UNIQUE_CARRIER, SUM(isDelayed(DEP_DELAY)) AS NumOfDelayed
FROM usa_flights
GROUP BY OP_UNIQUE_CARRIER
ORDER BY NumOfDelayed
DESC
LIMIT 10
Case3-04