Spark SQL交互式查询
如果您需要以交互式方式执行Spark SQL,可以指定Spark Interactive型资源组作为执行查询的资源组。资源组的资源量会在指定范围内自动扩缩容,在满足您交互式查询需求的同时还可以降低使用成本。本文为您详细介绍如何通过控制台、Hive JDBC、PyHive、Beeline、DBeaver等客户端工具实现Spark SQL交互式查询。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
集群与OSS存储空间位于相同地域。
已创建数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。
已安装Java 8开发环境和Python 3.9开发环境,以便后续运行Java应用、Python应用、Beeline等客户端。
已将客户端IP地址添加至AnalyticDB for MySQL集群白名单中。
注意事项
如果Spark Interactive型资源组处于停止状态,在执行第一个Spark SQL时集群会重新启动Spark Interactive型资源组,第一个Spark SQL可能会处于较长时间的排队等待状态。
Spark无法读写ADB_External_TPCH_10GB、INFORMATION_SCHEMA和MYSQL数据库,因此请不要将这些数据库作为初始连接的数据库。
请确保提交Spark SQL作业的数据库账号已具有访问目标数据库的权限,否则会导致查询失败。
准备工作
获取Spark Interactive型资源组的连接地址。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击,单击资源组管理页签。
单击对应资源组操作列的详情,查看内网连接地址和公网连接地址。您可单击端口号括号内的
按钮,复制连接地址。
以下两种情况,您需要单击公网地址后的申请网络,手动申请公网连接地址。
提交Spark SQL作业的客户端工具部署在本地或外部服务器。
提交Spark SQL作业的客户端工具部署在ECS上,且ECS与AnalyticDB for MySQL不属于同一VPC。
交互式查询
控制台
若您是自建HiveMetastore
,使用控制台开发Spark SQL作业时,请在AnalyticDB for MySQL中创建一个名为default
的数据库,并选择它作为执行Spark SQL的数据库。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏单击
。选择Spark引擎和创建的Spark Interactive型资源组,执行如下Spark SQL:
SHOW DATABASES;
SDK调用
通过调用SDK的方法执行Spark SQL时,查询结果会以文件的形式写入指定的OSS中,后续您可以在OSS控制台上查询数据,或将结果文件下载到本地查看。下文以Python语言调用SDK为例。
执行以下语句,安装SDK。
pip install alibabacloud-adb20211201
依次执行以下语句,安装环境依赖。
pip install oss2 pip install loguru
连接并执行Spark SQL。
# coding: utf-8 import csv import json import time from io import StringIO import oss2 from alibabacloud_adb20211201.client import Client from alibabacloud_adb20211201.models import ExecuteSparkWarehouseBatchSQLRequest, ExecuteSparkWarehouseBatchSQLResponse, \ GetSparkWarehouseBatchSQLRequest, GetSparkWarehouseBatchSQLResponse, \ ListSparkWarehouseBatchSQLRequest, CancelSparkWarehouseBatchSQLRequest, ListSparkWarehouseBatchSQLResponse from alibabacloud_tea_openapi.models import Config from loguru import logger def build_sql_config(oss_location, spark_sql_runtime_config: dict = None, file_format = "CSV", output_partitions = 1, sep = "|"): """ 构建ADB SQL执行的配置 :param oss_location: OSS存放SQL执行结果的路径 :param spark_sql_runtime_config: Spark SQL社区的原生配置 :param file_format: SQL执行结果的文件格式, 默认为CSV :param output_partitions: SQL执行结果的分区数, 如果需要输出大量结果, 必须增加输出时的分区数避免单个文件过大 :param sep: CSV文件的分隔符, 非CSV文件忽略 :return: SQL执行的配置 """ if oss_location is None: raise ValueError("oss_location is required") if not oss_location.startswith("oss://"): raise ValueError("oss_location must start with oss://") if file_format != "CSV" and file_format != "PARQUET" and file_format != "ORC" and file_format != "JSON": raise ValueError("file_format must be CSV, PARQUET, ORC or JSON") runtime_config = { # sql output config "spark.adb.sqlOutputFormat": file_format, "spark.adb.sqlOutputPartitions": output_partitions, "spark.adb.sqlOutputLocation": oss_location, # csv config "sep": sep } if spark_sql_runtime_config: runtime_config.update(spark_sql_runtime_config) return runtime_config def execute_sql(client: Client, dbcluster_id: str, resource_group_name: str, query: str, limit = 10000, runtime_config: dict = None, schema="default" ): """ 在Spark Interactive型资源组中执行SQL :param client: 阿里云客户端 :param dbcluster_id: 集群的ID :param resource_group_name: 集群的资源组,要求必须是Spark Interactive资源组 :param schema: SQL执行的默认的数据库名称, 不填写为default :param limit: SQL执行的结果的行数限制 :param query: 执行的SQL语句, 使用分号分隔多个SQL语句 :return: """ # 组装请求体 req = ExecuteSparkWarehouseBatchSQLRequest() # 集群ID req.dbcluster_id = dbcluster_id # 资源组名称 req.resource_group_name = resource_group_name # SQL执行超时时间 req.execute_time_limit_in_seconds = 3600 # 执行SQL的数据库名称 req.schema = schema # SQL业务代码 req.query = query # 返回结果行数 req.execute_result_limit = limit if runtime_config: # SQL执行的配置 req.runtime_config = json.dumps(runtime_config) # 执行SQL并获取query_id resp: ExecuteSparkWarehouseBatchSQLResponse = client.execute_spark_warehouse_batch_sql(req) logger.info("Query execute submitted: {}", resp.body.data.query_id) return resp.body.data.query_id def get_query_state(client, query_id): """ 查询SQL执行的状态 :param client: 阿里云客户端 :param query_id: SQL执行的ID :return: SQL执行的状态和结果 """ req = GetSparkWarehouseBatchSQLRequest(query_id=query_id) resp: GetSparkWarehouseBatchSQLResponse = client.get_spark_warehouse_batch_sql(req) logger.info("Query state: {}", resp.body.data.query_state) return resp.body.data.query_state, resp def list_history_query(client, db_cluster, resource_group_name, page_num): """ 查询Spark Interactive资源组中执行的SQL的历史 :param client: 阿里云客户端 :param db_cluster: 集群的ID :param resource_group_name: 资源组名称 :param page_num: 分页查询的页数 :return: 是否有SQL, 有的话代表可以进入下一页查询 """ req = ListSparkWarehouseBatchSQLRequest(dbcluster_id=db_cluster, resource_group_name=resource_group_name, page_number = page_num) resp: ListSparkWarehouseBatchSQLResponse = client.list_spark_warehouse_batch_sql(req) # 如果没有查询到SQL, 返回false. 否则返回true. 默认每页10条 if resp.body.data.queries is None: return True # 打印查询到的SQL for query in resp.body.data.queries: logger.info("Query ID: {}, State: {}", query.query_id, query.query_state) logger.info("Total queries: {}", len(resp.body.data.queries)) return len(resp.body.data.queries) < 10 def list_csv_files(oss_client, dir): for obj in oss_client.list_objects_v2(dir).object_list: if obj.key.endswith(".csv"): logger.info(f"reading {obj.key}") # read oss file content csv_content = oss_client.get_object(obj.key).read().decode('utf-8') csv_reader = csv.DictReader(StringIO(csv_content)) # Print the CSV content for row in csv_reader: print(row) if __name__ == '__main__': logger.info("ADB Spark Batch SQL Demo") # AccessKey ID,请填写实际值 _ak = "LTAI****************" # AccessKey Serect,请填写实际值 _sk = "yourAccessKeySecret" # 地域ID,请填写实际值 _region= "cn-shanghai" # 集群ID,请填写实际值 _db = "amv-uf6485635f****" # 资源组名称,请填写实际值 _rg_name = "testjob" # client config client_config = Config( # Alibaba Cloud AccessKey ID access_key_id=_ak, # Alibaba Cloud AccessKey Secret access_key_secret=_sk, # The endpoint of the ADB service # adb.ap-southeast-1.aliyuncs.com is the endpoint of the ADB service in the Singapore region # adb-vpc.ap-southeast-1.aliyuncs.com used in the VPC scenario endpoint=f"adb.{_region}.aliyuncs.com" ) # 创建阿里云客户端 _client = Client(client_config) # SQL执行的配置 _spark_sql_runtime_config = { "spark.sql.shuffle.partitions": 1000, "spark.sql.autoBroadcastJoinThreshold": 104857600, "spark.sql.sources.partitionOverwriteMode": "dynamic", "spark.sql.sources.partitionOverwriteMode.dynamic": "dynamic" } _config = build_sql_config(oss_location="oss://testBucketName/sql_result", spark_sql_runtime_config = _spark_sql_runtime_config) # 需要执行的SQL _query = """ SHOW DATABASES; SELECT 100; """ _query_id = execute_sql(client = _client, dbcluster_id=_db, resource_group_name=_rg_name, query=_query, runtime_config=_config) logger.info(f"Run query_id: {_query_id} for SQL {_query}.\n Waiting for result...") # 等待SQL执行完成 current_ts = time.time() while True: query_state, resp = get_query_state(_client, _query_id) """ query_state 有如下几种状态: - PENDING: 在服务队列排队中, 此时Spark Interactive型资源组正在启动 - SUBMITTED: 提交到了Spark Interactive型资源组 - RUNNING: SQL正在执行 - FINISHED: SQL执行完成, 没有报错 - FAILED: SQL执行失败 - CANCELED: SQL执行被取消 """ if query_state == "FINISHED": logger.info("query finished success") break elif query_state == "FAILED": # 打印失败信息 logger.error("Error Info: {}", resp.body.data) exit(1) elif query_state == "CANCELED": # 打印取消信息 logger.error("query canceled") exit(1) else: time.sleep(2) if time.time() - current_ts > 600: logger.error("query timeout") # 执行时间超过10分钟, 取消SQL执行 _client.cancel_spark_warehouse_batch_sql(CancelSparkWarehouseBatchSQLRequest(query_id=_query_id)) exit(1) # 一个Query可以包含多个语句, 列举所有的语句 for stmt in resp.body.data.statements: logger.info( f"statement_id: {stmt.statement_id}, result location: {stmt.result_uri}") # 查看结果的示例代码 _bucket = stmt.result_uri.split("oss://")[1].split("/")[0] _dir = stmt.result_uri.replace(f"oss://{_bucket}/", "").replace("//", "/") oss_client = oss2.Bucket(oss2.Auth(client_config.access_key_id, client_config.access_key_secret), f"oss-{_region}.aliyuncs.com", _bucket) list_csv_files(oss_client, _dir) # 查询在Spark Interactive资源组中执行的所有的SQL,可以分页查询 logger.info("List all history query") page_num = 1 no_more_page = list_history_query(_client, _db, _rg_name, page_num) while no_more_page: logger.info(f"List page {page_num}") page_num += 1 no_more_page = list_history_query(_client, _db, _rg_name, page_num)
参数说明:
_ak:阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
_sk:阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
region:AnalyticDB for MySQL集群所属地域ID。
_db:AnalyticDB for MySQL集群ID。
_rg_name:Spark Interactive型资源组的名称。
oss_location(可选):查询结果文件存储的OSS路径。
若不填写该参数,您仅可以在
页面,对应SQL查询语句的日志中,查看到5行数据。
应用程序
在pom.xml中配置Maven依赖。
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.9</version> </dependency>
建立连接并执行Spark SQL。
public class java { public static void main(String[] args) throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); String url = "<连接地址>"; Connection con = DriverManager.getConnection(url, "<用户名>", "<密码>"); Statement stmt = con.createStatement(); ResultSet tables = stmt.executeQuery("show tables"); List<String> tbls = new ArrayList<>(); while (tables.next()) { System.out.println(tables.getString("tableName")); tbls.add(tables.getString("tableName")); } } }
参数说明:
连接地址:准备工作获取的Spark Interactive型资源组连接地址。其中
default
需替换成实际连接的数据库名称。用户名:AnalyticDB for MySQL的数据库账号。
密码:AnalyticDB for MySQL数据库账号的密码。
安装Python Hive客户端。
pip install pyhive
建立连接并执行Spark SQL。
from pyhive import hive from TCLIService.ttypes import TOperationState cursor = hive.connect( host='<连接地址>', port=<端口号>, username='<资源组名称>/<用户名>', password='<密码>' ).cursor() cursor.execute('show tables') status = cursor.poll().operationState while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): logs = cursor.fetch_logs() for message in logs: print(message) # If needed, an asynchronous query can be cancelled at any time with: # cursor.cancel() status = cursor.poll().operationState print(cursor.fetchall())
参数说明:
连接地址:准备工作获取的Spark Interactive型资源组连接地址。其中
default
需替换成实际连接的数据库名称。端口号:Spark Interactive型资源组的端口号,固定为10000。
资源组名称:Spark Interactive型资源组的名称。
用户名:AnalyticDB for MySQL的数据库账号。
密码:AnalyticDB for MySQL数据库账号的密码。
客户端
除了在本文中详细介绍的Beeline、DBeaver、DBVisualizer、Datagrip客户端外,您还可以在Airflow、Azkaban、DolphinScheduler等工作流调度工具中执行交互式查询。
连接Spark Interactive型资源组。
命令格式如下:
!connect <连接地址> <用户名> <密码>
连接地址:准备工作获取的Spark Interactive型资源组连接地址。其中
default
需替换成实际连接的数据库名称。用户名:AnalyticDB for MySQL的数据库账号。
密码:AnalyticDB for MySQL数据库账号的密码。
示例:
!connect jdbc:hive2://amv-bp1c3em7b2e****-spark.ads.aliyuncs.com:10000/adb_test spark_resourcegroup/AdbSpark14**** Spark23****
返回结果:
Connected to: Spark SQL (version 3.2.0) Driver: Hive JDBC (version 2.3.9) Transaction isolation: TRANSACTION_REPEATABLE_READ
执行Spark SQL。
SHOW TABLES;
打开DBeaver客户端,单击 。
在连接到数据库页面,选择Apache Spark,单击下一步。
配置Hadoop/Apache Spark 连接设置,参数说明如下:
参数
说明
连接方式
连接方式选择为URL。
JDBC URL
请填写准备工作中获取的连接地址。
连接地址中的
default
需替换为实际的数据库名。用户名
AnalyticDB for MySQL的数据库账号。
密码
AnalyticDB for MySQL数据库账号的密码。
上述参数配置完成后,单击测试连接。
首次测试连接时,DBeaver会自动获取需要下载的驱动信息,获取完成后,请单击下载,下载相关驱动。
测试连接成功后,单击完成。
在数据库导航页签下,展开对应数据源的子目录,单击对应数据库。
在右侧代码框中输入SQL语句,并单击
按钮运行。
SHOW TABLES;
返回结果如下:
+-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | [] | +-----------+-----------+-------------+
打开DBVisualizer客户端,单击。
在Driver Manage页面,选择Hive,单击
按钮。
在Driver Settings页签下,配置如下参数:
参数
说明
Name
Hive数据源名称,您可以自定义名称。
URL Format
请填写准备工作中获取的连接地址。
连接地址中的
default
需替换为实际的数据库名。Driver Class
Hive驱动,固定选择为org.apache.hive.jdbc.HiveDriver。
参数配置完成后,请单击Start Download,下载对应驱动。
驱动下载完成后,单击
。在Create Database Connection from Database URL对话框中填写以下参数:
参数
说明
Database URL
请填写准备工作中获取的连接地址。
连接地址中的
default
需替换为实际的数据库名。Driver Class
选择步骤3创建的Hive数据源。
在Connection页面配置以下连接参数,并单击Connect。
参数
说明
Name
默认与步骤3创建的Hive数据源同名,您可以自定义名称。
Notes
备注信息。
Driver Type
选择Hive。
Database URL
请填写准备工作中获取的连接地址。
连接地址中的
default
需替换为实际的数据库名。Database Userid
AnalyticDB for MySQL的数据库账号。
Database Password
AnalyticDB for MySQL数据库账号的密码。
其他参数无需配置,使用默认值即可。
连接成功后,在Database页签下,展开对应数据源的子目录,单击对应数据库。
在右侧代码框中输入SQL语句,并单击
按钮运行。
SHOW TABLES;
返回结果如下:
+-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | false | +-----------+-----------+-------------+
打开Datagrip客户端,单击 ,创建项目。
添加数据源。
单击
按钮,选择 。
在弹出的Data Sources and Drivers对话框中配置如下参数后,单击OK。
参数
说明
Name
数据源名称,您可以自定义。本文示例为
adbtest
。Host
请填写准备工作中获取的连接地址。
连接地址中的
default
需替换为实际的数据库名。Port
Spark Interactive型资源组的端口号,固定为10000。
User
AnalyticDB for MySQL的数据库账号。
Password
AnalyticDB for MySQL数据库账号的密码。
Schema
AnalyticDB for MySQL集群的数据库名称。
执行Spark SQL。
在数据源列表中,右击步骤2创建的数据源,选择
。在右侧Console面板中执行Spark SQL。
SHOW TABLES;
- 本页导读 (1)
- 前提条件
- 注意事项
- 准备工作
- 交互式查询
- 控制台
- SDK调用
- 应用程序
- 客户端