Spark与DLA Presto引擎以及数据湖构建服务共享元数据。
Spark访问数据湖元数据服务

SQL执行,
您可以看到所有数据湖中的数据库和表结构,对表进行在线分析查询,以及管理子用户对库表的权限,如下图所示。
提交Spark SQL作业
spark.sql.hive.metastore.version": "dla
。以下示例是将数据湖元数据中1k_tables
数据库下的table0
表的内容提取出来:{
"sqls": [
"select * from `1k_tables`.`table0` limit 100",
"insert into `1k_tables`.`table0` values(1, 'test')"
],
"name": "sql test",
"jars": [
"oss://test/hive-serde-3.1.2.jar"
],
"conf": {
"spark.dla.connectors": "oss",
"spark.driver.resourceSpec": "small",
"spark.sql.hive.metastore.version": "dla",
"spark.sql.catalogImplementation": "hive",
"spark.executor.instances": 10,
"spark.dla.job.log.oss.uri": "oss://test/spark-logs",
"spark.executor.resourceSpec": "small"
}
}
sqls关键字允许用户不提交jar包,直接提交SQL作业,使用方式更简单,适合于熟悉SQL语言的开发者。参数值是一个数组,可以在一个作业中执行多条SQL语句,不同的语句用逗号隔开。
您也可以通过OpenAPI提交Spark SQL作业,提交方式跟其他类型Spark作业相同。
DLA也提供了spark-sql工具包来提交sql作业,具体请参考Spark-SQL命令行工具。
sqls关键字语法规则说明
您可以通过以下两种方式来调用SparkSession.sql执行用户填写的sql语句。
在sqls字段中直接填写sql语句。在作业提交之后sqls字段中的语句会被提取出来,交给内部的sql语句模板类,模板类通过调用SparkSession.sql来执行用户填写的sql语句。
在sqls字段中直接填写sql语句时,由于sql语句包含在json字符串中,sql语句中出现的字符需要符合json的语法规范,,如下符号需要被转义才能写入sqls字段:
需要转义的字符
转义后的写法
双引号
\"
换行符
\n
回车符
\r
制表符
\t
反斜杠(后接非转义字符如d,w等)
\\\\
说明如果在sql中需要写正则表达式,在sql语句中使用
REGEX '\d+'
,则在sqls字段中应该写为REGEX '\\\\d+'
。这是因为json中需要转义一次反斜杠,提交给SparkSession.sql执行的时候实际上是REGEX '\\d+'
, SparkSession.sql执行的时候也需要对传入的正则表达式中所带的反斜杠再进行一次转义,最后Spark引擎获取的是REGEX '\d+'
。示例:
//源sql文件。 select * from t where col regex "\d+" //转换后的sql文件。 { "sqls":["select * from t where \n col regex \"\\\\d+\""], "conf":{} }
(推荐)在sqls字段中填写oss路径。该oss存放着用户的sql语句,sql文件中的每条sql使用分号(;) 隔开。sql语句模板类会下载用户路径中的sql语句,并交给SparkSession.sql来执行用户填写的sql语句。
由于sql语句直接存放在oss上,不需要经过json传输,直接由sql模板类下载使用。因此对于双引号、换行符、回车符、制表符都不需要做特殊处理。需要注意的是对于正则表达式,还是需要符合spark.sql语句的规范。
示例:
//源sql文件。 select * from t where col regex "\d+" //转换后的sql文件。 select * from t where col regex "\\d+" { "sqls":["oss://path/to/your.sql"], "conf":{} }
代码中使用Spark SQL
from pyspark.sql import SparkSession
if __name__ == "__main__":
# init pyspark context
spark = SparkSession \
.builder \
.appName("Python SQL Test") \
.getOrCreate()
# create a database
spark.sql(
"create database if not exists dlatest comment 'c' location 'oss://test/warehouse/' WITH DBPROPERTIES(k1='v1', k2='v2')")
# create table
spark.sql(
"create table dlatest.tp(col1 INT) PARTITIONED BY (p1 STRING, p2 STRING) location 'oss://test/warehouse/tp' STORED AS parquet TBLPROPERTIES ('parquet.compress'='SNAPPY')")
# show structure
print(spark.sql("show create table tp").collect()[0])
# insert data
spark.sql("INSERT into tp partition(p1='a',p2='a') values(1)")
# show data
spark.sql("select * from tp").show()
{
"name": "DLA SQL Test",
"file": "oss://path/to/example.py"
"conf": {
"spark.driver.resourceSpec": "small",
"spark.sql.hive.metastore.version": "dla",
"spark.sql.catalogImplementation": "hive",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
"spark.executor.resourceSpec": "small"
}
}
SQL执行
页面中找到名为dlatest
的数据库,以及其下的tp
表。DLA元数据服务对命名大小写不敏感,在引用库名和表名时忽略大小写。
读写Hive存储格式的表时的注意事项
推荐您在Maven官方仓库下载Hive Serde的Jar包,如hive-serde。
{
"name": "DLA Meta Test",
"sqls": ["SELECT * FROM HiveDB.JsonTable"],
"jars": [
"oss://test/hive-serde-3.1.2.jar"
],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.sql.hive.metastore.version": "dla",
"spark.sql.catalogImplementation": "hive",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://test/spark-logs",
"spark.executor.resourceSpec": "small"
}
}
使用限制和注意事项
在Spark中仅支持External类型数据库和表的创建和读写操作。
LOCATION
信息,示例如下:CREATE DATABASE db1 LOCATION 'oss://test/db1/'
LOCATION
信息,示例如下:CREATE TABLE table1(col1 INT) LOCATION 'oss://test/db1/table1/'
当用户在Spark中DROP一个表或者表的某个
PARTITION
时,并不会删除OSS上的文件。当用户创建一个表时,指定的表的
LOCATION
必须是库的LOCATION
的子文件夹。当用户为添加表的
PARTITION
时,指定的PARTITION LOCATION
必须是表的LOCATION
的子文件夹。当用户
RENAME PARTITION
` 时,并不会改变其在OSS上的路径结构。
2. Spark中仅支持以OSS为存储的外表。
SQL执行
支持多种不同的存储,包括RDS、表格存储等等。当前在Spark中使用元数据服务支持读写以OSS为存储的外表。使用Spark直接创建数据库和数据表,LOCATION
必须指定一个合法的OSS地址。对于其他存储的支持,后续会陆续更新。DEFAULT
为名字的数据库。DEFAULT
为名字的数据库,需要注意以下两个事项:禁止在Spark中创建和操作名为
DEFAULT
的数据库。在Spark中执行SQL时,操作表之前需要使用
USE DatabaseName
SQL语句来切换到目标数据。或者显式指定某个表属于哪个数据库,例如SELELCT * FROM db1.table1
。
ALTER
语句有部分限制。ALTER DATABASE ...
等语句修改库和表的元数据信息,在Spark中使用这一类语句有如下的限制:当前对数据库,仅支持修改
COMMENT
信息,其它如LOCATION
、PROPERTIES
禁止修改。当前仅支持修改表的
COLUMN
和PROPERTIES
信息,如添加列、修改注解等等。请注意这里的COLUMN
必须是非PARTITION
列。
GRANT
类赋权语句。
在文档使用中是否遇到以下问题
更多建议
匿名提交