全部产品

Spark SQL

Spark与DLA Presto引擎以及数据湖构建服务共享元数据。

Spark访问数据湖元数据服务

Spark引擎可以支持多种元数据服务,既支持访问用户自建的Hive,也支持访问DLA统一管理的数据湖元数据。DLA统一管理的数据湖元数据管理服务,同时支持多种引擎访问,实现多种引擎的元数据信息共享。在数据湖执行DLA SQL、元信息发现T+1全量同步一键建仓中创建的库表结构, 可以被Spark读取并使用,Spark SQL创建或者修改的元数据也可以被其他引擎访问到。下图是Spark SQL和DLA SQL与元数据服务之间的关系。
数据湖多引擎连接元数据管理
登录DLA控制台,单击左侧导航栏的SQL执行,您可以看到所有数据湖中的数据库和表结构,对表进行在线分析查询,以及管理子用户对库表的权限,如下图所示。
SQL执行截图

提交Spark SQL作业

如果需要连接DLA的元数据服务,只需要在DLA控制台上提交作业时,加入配置项spark.sql.hive.metastore.version": "dla。以下示例是将数据湖元数据中1k_tables 数据库下的table0 表的内容提取出来:
{
    "sqls": [
        "select * from `1k_tables`.`table0` limit 100",
        "insert into `1k_tables`.`table0` values(1, 'test')"
    ],
    "name": "sql test",
    "jars": [
        "oss://test/hive-serde-3.1.2.jar"
    ],
    "conf": {
        "spark.dla.connectors": "oss",
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.sql.catalogImplementation": "hive",
        "spark.executor.instances": 10,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}
说明
  • sqls关键字允许用户不提交jar包,直接提交SQL作业,使用方式更简单,适合于熟悉SQL语言的开发者。参数值是一个数组,可以在一个作业中执行多条SQL语句,不同的语句用逗号隔开。

  • 您也可以通过OpenAPI提交Spark SQL作业,提交方式跟其他类型Spark作业相同。

  • DLA也提供了spark-sql工具包来提交sql作业,具体请参考Spark-SQL命令行工具

sqls关键字语法规则说明

您可以通过以下两种方式来调用SparkSession.sql执行用户填写的sql语句。

  • 在sqls字段中直接填写sql语句。在作业提交之后sqls字段中的语句会被提取出来,交给内部的sql语句模板类,模板类通过调用SparkSession.sql来执行用户填写的sql语句。

    在sqls字段中直接填写sql语句时,由于sql语句包含在json字符串中,sql语句中出现的字符需要符合json的语法规范,,如下符号需要被转义才能写入sqls字段:

    需要转义的字符

    转义后的写法

    双引号

    \"

    换行符

    \n

    回车符

    \r

    制表符

    \t

    反斜杠(后接非转义字符如d,w等)

    \\\\

    说明

    如果在sql中需要写正则表达式,在sql语句中使用REGEX '\d+' ,则在sqls字段中应该写为REGEX '\\\\d+'。这是因为json中需要转义一次反斜杠,提交给SparkSession.sql执行的时候实际上是REGEX '\\d+', SparkSession.sql执行的时候也需要对传入的正则表达式中所带的反斜杠再进行一次转义,最后Spark引擎获取的是REGEX '\d+'

    示例:

    //源sql文件。
    select * from t where 
    col  regex  "\d+"
    
    //转换后的sql文件。
    {
      "sqls":["select * from t where \n col regex \"\\\\d+\""],
      "conf":{}
    }
  • (推荐)在sqls字段中填写oss路径。该oss存放着用户的sql语句,sql文件中的每条sql使用分号(;) 隔开。sql语句模板类会下载用户路径中的sql语句,并交给SparkSession.sql来执行用户填写的sql语句。

    由于sql语句直接存放在oss上,不需要经过json传输,直接由sql模板类下载使用。因此对于双引号、换行符、回车符、制表符都不需要做特殊处理。需要注意的是对于正则表达式,还是需要符合spark.sql语句的规范。

  • 示例:

    //源sql文件。
    select * from t where 
    col  regex  "\d+"
    
    //转换后的sql文件。
    select * from t where 
    col  regex  "\\d+"
    
    {
      "sqls":["oss://path/to/your.sql"],
      "conf":{}
    }

代码中使用Spark SQL

您也可以在程序中执行SQL,操作元数据信息,以及读写表内容。下面以PySpark为例进行介绍,其他语言使用方式类似。首先,建立以下Python文件,保存为example.py,将文件上传至OSS。
from pyspark.sql import SparkSession

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession \
        .builder \
        .appName("Python SQL Test") \
        .getOrCreate()
    
    # create a database
    spark.sql(
            "create database if not exists dlatest comment 'c' location 'oss://test/warehouse/' WITH DBPROPERTIES(k1='v1', k2='v2')")
    # create table
    spark.sql(
            "create table dlatest.tp(col1 INT)  PARTITIONED BY (p1 STRING, p2 STRING) location 'oss://test/warehouse/tp' STORED AS parquet TBLPROPERTIES ('parquet.compress'='SNAPPY')")
    # show structure
    print(spark.sql("show create table tp").collect()[0])
    
    # insert data
    spark.sql("INSERT into tp partition(p1='a',p2='a') values(1)")
    
    # show data
    spark.sql("select * from tp").show()
通过以下的JSON将作业通过DLA Spark控制台进行提交。
{
    "name": "DLA SQL Test",
    "file": "oss://path/to/example.py"
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.sql.catalogImplementation": "hive",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}
执行成功后, 可以在DLA控制台的SQL执行页面中找到名为dlatest的数据库,以及其下的tp表。
注意

DLA元数据服务对命名大小写不敏感,在引用库名和表名时忽略大小写。

读写Hive存储格式的表时的注意事项

当使用一键建仓服务或者在控制台手动执行SQL的方式创建表,并指定表的数据读写格式为Hive的某些格式,比如JSON、CSV时,需要在提交Spark作业时,引入相关的Jar包。
注意

推荐您在Maven官方仓库下载Hive Serde的Jar包,如hive-serde

当提交Spark作业时,如果表的数据存储格式是Hive的某种格式,需要将对应的Jar包作为第三方Jar提交,代码示例如下:
{
    "name": "DLA Meta Test",
    "sqls": ["SELECT * FROM HiveDB.JsonTable"],
    "jars": [
        "oss://test/hive-serde-3.1.2.jar"
    ],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.sql.catalogImplementation": "hive",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}

使用限制和注意事项

  1. 在Spark中仅支持External类型数据库和表的创建和读写操作。

当前Spark连接数据湖元数据服务,只支持外表(External Table)的读写和创建操作。在建立数据库时, 需要显式指定LOCATION信息,示例如下:
CREATE DATABASE db1 LOCATION 'oss://test/db1/'
建表语句必须显式指定表的存储LOCATION信息,示例如下:
CREATE TABLE table1(col1 INT) LOCATION 'oss://test/db1/table1/'
注意事项
说明

  • 当用户在Spark中DROP一个表或者表的某个PARTITION时,并不会删除OSS上的文件。

  • 当用户创建一个表时,指定的表的LOCATION必须是库的LOCATION的子文件夹。

  • 当用户为添加表的PARTITION 时,指定的PARTITION LOCATION必须是表的LOCATION的子文件夹。

  • 当用户RENAME PARTITION` 时,并不会改变其在OSS上的路径结构。

2. Spark中仅支持以OSS为存储的外表。

目前,数据湖分析SQL执行支持多种不同的存储,包括RDS、表格存储等等。当前在Spark中使用元数据服务支持读写以OSS为存储的外表。使用Spark直接创建数据库和数据表,LOCATION必须指定一个合法的OSS地址。对于其他存储的支持,后续会陆续更新。
3. 禁止创建DEFAULT为名字的数据库。
由于不允许使用DEFAULT为名字的数据库,需要注意以下两个事项:
  • 禁止在Spark中创建和操作名为DEFAULT的数据库。

  • 在Spark中执行SQL时,操作表之前需要使用USE DatabaseNameSQL语句来切换到目标数据。或者显式指定某个表属于哪个数据库,例如SELELCT * FROM db1.table1

4. 在Spark中执行 ALTER 语句有部分限制。
您可以通过类似ALTER DATABASE ... 等语句修改库和表的元数据信息,在Spark中使用这一类语句有如下的限制:
  • 当前对数据库,仅支持修改COMMENT信息,其它如LOCATIONPROPERTIES禁止修改。

  • 当前仅支持修改表的COLUMNPROPERTIES信息,如添加列、修改注解等等。请注意这里的COLUMN必须是非PARTITION列。

5. 在Spark SQL中不支持GRANT类赋权语句。
同开源社区Spark一致,当前用户无法通过Spark引擎执行一个GRANT语句来修改子账户赋权,需要修改子用户权限可以在数据湖分析的执行SQL页面上通过执行GRANT/REVOKE语句的方式修改用户的权限。