使用Spark SQL进行作业开发
本文介绍如何使用Spark SQL作业开发,将ODS层表数据通过DWD层过滤后,写入ADS层。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
说明企业版集群预留资源不能为0 ACU。
湖仓版集群至少有16 ACU的计算预留资源和24 ACU的存储预留资源。
已创建Job型资源组。详情请参见创建资源组。
步骤一:创建ODS层数据表
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在SQLConsole窗口,选择Job型资源组和Spark引擎。
输入以下语句,并单击执行SQL(F8),创建一个名为
test_spark_db
的库。CREATE DATABASE test_spark_db;
在弹窗中,选择继续执行。
重要选择继续执行时,SQL语句为批量执行,SQL语句运行在单独的Spark App中,可以保证资源隔离和稳定性。本次入门教程推荐使用继续执行。
选择启动ThriftServer时,SQL语句为交互式执行,资源隔离为线程级的,且需要配置Spark ThriftServer后才执行SQL语句。详情请参见启停ThriftServer。
使用继续执行的方式,在
test_spark_db
库中创建ODS层的无索引、有分区的表adb_spark_ods
,并插入数据。CREATE TABLE test_spark_db.adb_spark_ods (id int, name string, age int) USING adb tblproperties ( 'distributeType' = 'HASH', 'distributeColumns' = 'id', 'partitionType' = 'VALUE', 'partitionColumn' = 'age', 'partitionCount' = '200', 'indexAll' = 'false')
INSERT OVERWRITE test_spark_db.adb_spark_ods PARTITION(age) VALUES (001,'Anna',18), (002,'Bob',22), (003,'Cindy',12), (004,'Dan',25);
步骤二:创建DWD层数据表
本教程以从ODS层表中读取并过滤数据后写入DWD层表中为例。
使用继续执行的方式,在
test_spark_db
库中创建一张无索引、有分区的表adb_spark_dwd
。CREATE TABLE test_spark_db.adb_spark_dwd ( id int, name string, age int ) USING adb TBLPROPERTIES( 'distributeType'='HASH', 'distributeColumns'='id', 'partitionType'='value', 'partitionColumn'='age', 'partitionCount'='200', 'indexAll'='false')
使用继续执行的方式,从ODS层表
adb_spark_ods
中读取id
列不为002的数据并写入DWD层数据表。INSERT OVERWRITE test_spark_db.adb_spark_dwd partition(age) SELECT id, name, age FROM test_spark_db.adb_spark_ods WHERE id != 002;
使用继续执行的方式,查询
adb_spark_dwd
表数据。SELECT * FROM test_spark_db.adb_spark_dwd;
说明使用Spark SQL执行查询语句,返回结果不展示表数据。如果您需要查看表数据,请按照步骤4操作。
可选:在Spark开发页面的应用列表中,单击目标SQL查询语句操作列的日志,在日志中可以查看表数据。
步骤三:创建ADS层数据表
ADS层表数据是对DWD层数据做了更精细地过滤,可直接用于业务分析,对查询速率有一定的要求,因此创建ADS层数据表时需添加索引。本教程从DWD层数据表adb_spark_dwd
中读取age列大于15的数据并写入ADS层数据表adb_spark_ads
。
使用继续执行的方式,在
test_spark_db
库中创建一张有索引,有分区的表adb_spark_ads
。CREATE TABLE test_spark_db.adb_spark_ads ( id int, name string, age int ) USING adb TBLPROPERTIES( 'distributeType'='HASH', 'distributeColumns'='id', 'partitionType'='value', 'partitionColumn'='age', 'partitionCount'='200', 'indexAll'='true')
使用继续执行的方式,从DWD层数据表
adb_spark_dwd
中读取age列大于15的数据并写入ADS层数据表adb_spark_ads
。INSERT OVERWRITE test_spark_db.adb_spark_ads partition(age) SELECT id, name, age FROM test_spark_db.adb_spark_dwd WHERE age > 15;
步骤四:查询ADS层表数据
AnalyticDB for MySQL企业版、基础版及湖仓版集群既支持通过Spark SQL或XIHE BSP SQL离线方式查询数据,也支持通过XIHE MPP SQL在线实时查询数据。为保证数据的实时性,本教程以XIHE MPP SQL在线方式查询ADS层表数据为例。
在SQLConsole窗口,选择XIHE引擎和(Interactive)user_default资源组。
执行以下语句,查询ADS层表数据。
SELECT * FROM test_spark_db.adb_spark_ads;
返回结果如下:
+------+-------+------+ | id | name | age | +------+-------+------+ | 4 | Ban | 25 | | 1 | Anna | 18 | +------+-------+------+