使用Spark SQL进行作业开发

更新时间:

本文介绍如何使用Spark SQL作业开发,将ODS层表数据通过DWD层过滤后,写入ADS层。

前提条件

  • 集群的产品系列为企业版、基础版或湖仓版

    说明
    • 企业版集群预留资源不能为0 ACU。

    • 湖仓版集群至少有16 ACU的计算预留资源和24 ACU的存储预留资源。

  • 已创建Job型资源组。详情请参见创建资源组

步骤一:创建ODS层数据表

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Job型资源组和Spark引擎。

  4. 输入以下语句,并单击执行SQL(F8),创建一个名为test_spark_db的库。

    CREATE DATABASE test_spark_db;
  5. 在弹窗中,选择继续执行

    重要
    • 选择继续执行时,SQL语句为批量执行,SQL语句运行在单独的Spark App中,可以保证资源隔离和稳定性。本次入门教程推荐使用继续执行

    • 选择启动ThriftServer时,SQL语句为交互式执行,资源隔离为线程级的,且需要配置Spark ThriftServer后才执行SQL语句。详情请参见启停ThriftServer

  6. 使用继续执行的方式,在test_spark_db库中创建ODS层的无索引、有分区的表adb_spark_ods,并插入数据。

    CREATE TABLE test_spark_db.adb_spark_ods
      (id int,
       name string,
       age int)
    USING adb
    tblproperties (
    'distributeType' = 'HASH',
    'distributeColumns' = 'id',
    'partitionType' = 'VALUE',
    'partitionColumn' = 'age', 
    'partitionCount' = '200', 
    'indexAll' = 'false')
    INSERT OVERWRITE test_spark_db.adb_spark_ods PARTITION(age) VALUES
    (001,'Anna',18),
    (002,'Bob',22),
    (003,'Cindy',12),
    (004,'Dan',25);

步骤二:创建DWD层数据表

本教程以从ODS层表中读取并过滤数据后写入DWD层表中为例。

  1. 使用继续执行的方式,在test_spark_db库中创建一张无索引、有分区的表adb_spark_dwd

    CREATE TABLE test_spark_db.adb_spark_dwd (
      id int,
      name string,
      age int
    )
    USING adb
    TBLPROPERTIES(
      'distributeType'='HASH', 
      'distributeColumns'='id', 
      'partitionType'='value',
      'partitionColumn'='age',
      'partitionCount'='200',
      'indexAll'='false')
  2. 使用继续执行的方式,从ODS层表adb_spark_ods中读取id列不为002的数据并写入DWD层数据表。

    INSERT OVERWRITE test_spark_db.adb_spark_dwd partition(age) 
    SELECT 
      id,
      name,
      age
    FROM test_spark_db.adb_spark_ods WHERE id != 002;
  3. 使用继续执行的方式,查询adb_spark_dwd表数据。

    SELECT * FROM test_spark_db.adb_spark_dwd;
    说明

    使用Spark SQL执行查询语句,返回结果不展示表数据。如果您需要查看表数据,请按照步骤4操作。

  4. 可选:Spark开发页面的应用列表中,单击目标SQL查询语句操作列的日志,在日志中可以查看表数据。

步骤三:创建ADS层数据表

ADS层表数据是对DWD层数据做了更精细地过滤,可直接用于业务分析,对查询速率有一定的要求,因此创建ADS层数据表时需添加索引。本教程从DWD层数据表adb_spark_dwd中读取age列大于15的数据并写入ADS层数据表adb_spark_ads

  1. 使用继续执行的方式,在test_spark_db库中创建一张有索引,有分区的表adb_spark_ads

    CREATE TABLE test_spark_db.adb_spark_ads (
      id int,
      name string,
      age int
    )
    USING adb
    TBLPROPERTIES(
      'distributeType'='HASH', 
      'distributeColumns'='id', 
      'partitionType'='value',
      'partitionColumn'='age',
      'partitionCount'='200',
      'indexAll'='true')
  2. 使用继续执行的方式,从DWD层数据表adb_spark_dwd中读取age列大于15的数据并写入ADS层数据表adb_spark_ads

    INSERT OVERWRITE test_spark_db.adb_spark_ads partition(age) 
    SELECT 
      id,
      name,
      age 
    FROM test_spark_db.adb_spark_dwd WHERE age > 15;

步骤四:查询ADS层表数据

AnalyticDB for MySQL企业版、基础版及湖仓版集群既支持通过Spark SQLXIHE BSP SQL离线方式查询数据,也支持通过XIHE MPP SQL在线实时查询数据。为保证数据的实时性,本教程以XIHE MPP SQL在线方式查询ADS层表数据为例。

  1. SQLConsole窗口,选择XIHE引擎和(Interactive)user_default资源组。

  2. 执行以下语句,查询ADS层表数据。

    SELECT * FROM test_spark_db.adb_spark_ads;

    返回结果如下:

    +------+-------+------+
    | id   | name  | age  |
    +------+-------+------+
    |    4 | Ban   |   25 |
    |    1 | Anna  |   18 |
    +------+-------+------+