Spark SQL交互式分析
如果您需要以交互式方式执行Spark SQL,可以指定Spark Interactive型资源组作为执行查询的资源组。资源组的资源量会在指定范围内自动扩缩容,在满足您交互式分析需求的同时还可以降低使用成本。本文为您详细介绍如何通过控制台、Hive JDBC、PyHive、Beeline、DBeaver等客户端工具实现Spark SQL交互式分析。
前提条件
- 集群的产品系列为企业版、基础版或湖仓版。 
- 集群与OSS存储空间位于相同地域。 
- 已创建数据库账号。 - 如果是通过阿里云账号访问,只需创建高权限账号。 
- 如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。 
 
- 已安装Java 8开发环境和Python 3.9开发环境,以便后续运行Java应用、Python应用、Beeline等客户端。 
- 已将客户端IP地址添加至AnalyticDB for MySQL集群白名单中。 
注意事项
- 如果Spark Interactive型资源组处于停止状态,在执行第一个Spark SQL时集群会重新启动Spark Interactive型资源组,第一个Spark SQL可能会处于较长时间的排队等待状态。 
- Spark无法读写INFORMATION_SCHEMA和MYSQL数据库,因此请不要将这些数据库作为初始连接的数据库。 
- 请确保提交Spark SQL作业的数据库账号已具有访问目标数据库的权限,否则会导致查询失败。 
准备工作
- 获取Spark Interactive型资源组的连接地址。 - 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。 
- 在左侧导航栏,单击,单击资源组管理页签。 
- 单击对应资源组操作列的详情,查看内网连接地址和公网连接地址。您可单击连接地址后  按钮复制连接地址,或端口号括号内的 按钮复制连接地址,或端口号括号内的 按钮,复制JDBC连接串。 按钮,复制JDBC连接串。- 以下两种情况,您需要单击公网地址后的申请网络,手动申请公网连接地址。 - 提交Spark SQL作业的客户端工具部署在本地或外部服务器。 
- 提交Spark SQL作业的客户端工具部署在ECS上,且ECS与AnalyticDB for MySQL不属于同一VPC。 
  
 
交互式分析
控制台
若您是自建HiveMetastore,使用控制台开发Spark SQL作业时,请在AnalyticDB for MySQL中创建一个名为default的数据库,并选择它作为执行Spark SQL的数据库。
- 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。 
- 在左侧导航栏单击。 
- 选择Spark引擎和创建的Spark Interactive型资源组,执行如下Spark SQL: - SHOW DATABASES;
SDK调用
通过调用SDK的方法执行Spark SQL时,查询结果会以文件的形式写入指定的OSS中,后续您可以在OSS控制台上查询数据,或将结果文件下载到本地查看。下文以Python语言调用SDK为例。
- 执行以下语句,安装SDK。 - pip install alibabacloud-adb20211201
- 依次执行以下语句,安装环境依赖。 - pip install oss2 pip install loguru
- 连接并执行Spark SQL。 - # coding: utf-8 import csv import json import time from io import StringIO import oss2 from alibabacloud_adb20211201.client import Client from alibabacloud_adb20211201.models import ExecuteSparkWarehouseBatchSQLRequest, ExecuteSparkWarehouseBatchSQLResponse, \ GetSparkWarehouseBatchSQLRequest, GetSparkWarehouseBatchSQLResponse, \ ListSparkWarehouseBatchSQLRequest, CancelSparkWarehouseBatchSQLRequest, ListSparkWarehouseBatchSQLResponse from alibabacloud_tea_openapi.models import Config from loguru import logger def build_sql_config(oss_location, spark_sql_runtime_config: dict = None, file_format = "CSV", output_partitions = 1, sep = "|"): """ 构建ADB SQL执行的配置 :param oss_location: OSS存放SQL执行结果的路径 :param spark_sql_runtime_config: Spark SQL社区的原生配置 :param file_format: SQL执行结果的文件格式, 默认为CSV :param output_partitions: SQL执行结果的分区数, 如果需要输出大量结果, 必须增加输出时的分区数避免单个文件过大 :param sep: CSV文件的分隔符, 非CSV文件忽略 :return: SQL执行的配置 """ if oss_location is None: raise ValueError("oss_location is required") if not oss_location.startswith("oss://"): raise ValueError("oss_location must start with oss://") if file_format != "CSV" and file_format != "PARQUET" and file_format != "ORC" and file_format != "JSON": raise ValueError("file_format must be CSV, PARQUET, ORC or JSON") runtime_config = { # sql output config "spark.adb.sqlOutputFormat": file_format, "spark.adb.sqlOutputPartitions": output_partitions, "spark.adb.sqlOutputLocation": oss_location, # csv config "sep": sep } if spark_sql_runtime_config: runtime_config.update(spark_sql_runtime_config) return runtime_config def execute_sql(client: Client, dbcluster_id: str, resource_group_name: str, query: str, limit = 10000, runtime_config: dict = None, schema="default" ): """ 在Spark Interactive型资源组中执行SQL :param client: 阿里云客户端 :param dbcluster_id: 集群的ID :param resource_group_name: 集群的资源组,要求必须是Spark Interactive资源组 :param schema: SQL执行的默认的数据库名称, 不填写为default :param limit: SQL执行的结果的行数限制 :param query: 执行的SQL语句, 使用分号分隔多个SQL语句 :return: """ # 组装请求体 req = ExecuteSparkWarehouseBatchSQLRequest() # 集群ID req.dbcluster_id = dbcluster_id # 资源组名称 req.resource_group_name = resource_group_name # SQL执行超时时间 req.execute_time_limit_in_seconds = 3600 # 执行SQL的数据库名称 req.schema = schema # SQL业务代码 req.query = query # 返回结果行数 req.execute_result_limit = limit if runtime_config: # SQL执行的配置 req.runtime_config = json.dumps(runtime_config) # 执行SQL并获取query_id resp: ExecuteSparkWarehouseBatchSQLResponse = client.execute_spark_warehouse_batch_sql(req) logger.info("Query execute submitted: {}", resp.body.data.query_id) return resp.body.data.query_id def get_query_state(client, query_id): """ 查询SQL执行的状态 :param client: 阿里云客户端 :param query_id: SQL执行的ID :return: SQL执行的状态和结果 """ req = GetSparkWarehouseBatchSQLRequest(query_id=query_id) resp: GetSparkWarehouseBatchSQLResponse = client.get_spark_warehouse_batch_sql(req) logger.info("Query state: {}", resp.body.data.query_state) return resp.body.data.query_state, resp def list_history_query(client, db_cluster, resource_group_name, page_num): """ 查询Spark Interactive资源组中执行的SQL的历史 :param client: 阿里云客户端 :param db_cluster: 集群的ID :param resource_group_name: 资源组名称 :param page_num: 分页查询的页数 :return: 是否有SQL, 有的话代表可以进入下一页查询 """ req = ListSparkWarehouseBatchSQLRequest(dbcluster_id=db_cluster, resource_group_name=resource_group_name, page_number = page_num) resp: ListSparkWarehouseBatchSQLResponse = client.list_spark_warehouse_batch_sql(req) # 如果没有查询到SQL, 返回false. 否则返回true. 默认每页10条 if resp.body.data.queries is None: return True # 打印查询到的SQL for query in resp.body.data.queries: logger.info("Query ID: {}, State: {}", query.query_id, query.query_state) logger.info("Total queries: {}", len(resp.body.data.queries)) return len(resp.body.data.queries) < 10 def list_csv_files(oss_client, dir): for obj in oss_client.list_objects_v2(dir).object_list: if obj.key.endswith(".csv"): logger.info(f"reading {obj.key}") # read oss file content csv_content = oss_client.get_object(obj.key).read().decode('utf-8') csv_reader = csv.DictReader(StringIO(csv_content)) # Print the CSV content for row in csv_reader: print(row) if __name__ == '__main__': logger.info("ADB Spark Batch SQL Demo") # AccessKey ID,请填写实际值 _ak = "LTAI****************" # AccessKey Serect,请填写实际值 _sk = "yourAccessKeySecret" # 地域ID,请填写实际值 _region= "cn-shanghai" # 集群ID,请填写实际值 _db = "amv-uf6485635f****" # 资源组名称,请填写实际值 _rg_name = "testjob" # client config client_config = Config( # Alibaba Cloud AccessKey ID access_key_id=_ak, # Alibaba Cloud AccessKey Secret access_key_secret=_sk, # The endpoint of the ADB service # adb.ap-southeast-1.aliyuncs.com is the endpoint of the ADB service in the Singapore region # adb-vpc.ap-southeast-1.aliyuncs.com used in the VPC scenario endpoint=f"adb.{_region}.aliyuncs.com" ) # 创建阿里云客户端 _client = Client(client_config) # SQL执行的配置 _spark_sql_runtime_config = { "spark.sql.shuffle.partitions": 1000, "spark.sql.autoBroadcastJoinThreshold": 104857600, "spark.sql.sources.partitionOverwriteMode": "dynamic", "spark.sql.sources.partitionOverwriteMode.dynamic": "dynamic" } _config = build_sql_config(oss_location="oss://testBucketName/sql_result", spark_sql_runtime_config = _spark_sql_runtime_config) # 需要执行的SQL _query = """ SHOW DATABASES; SELECT 100; """ _query_id = execute_sql(client = _client, dbcluster_id=_db, resource_group_name=_rg_name, query=_query, runtime_config=_config) logger.info(f"Run query_id: {_query_id} for SQL {_query}.\n Waiting for result...") # 等待SQL执行完成 current_ts = time.time() while True: query_state, resp = get_query_state(_client, _query_id) """ query_state 有如下几种状态: - PENDING: 在服务队列排队中, 此时Spark Interactive型资源组正在启动 - SUBMITTED: 提交到了Spark Interactive型资源组 - RUNNING: SQL正在执行 - FINISHED: SQL执行完成, 没有报错 - FAILED: SQL执行失败 - CANCELED: SQL执行被取消 """ if query_state == "FINISHED": logger.info("query finished success") break elif query_state == "FAILED": # 打印失败信息 logger.error("Error Info: {}", resp.body.data) exit(1) elif query_state == "CANCELED": # 打印取消信息 logger.error("query canceled") exit(1) else: time.sleep(2) if time.time() - current_ts > 600: logger.error("query timeout") # 执行时间超过10分钟, 取消SQL执行 _client.cancel_spark_warehouse_batch_sql(CancelSparkWarehouseBatchSQLRequest(query_id=_query_id)) exit(1) # 一个Query可以包含多个语句, 列举所有的语句 for stmt in resp.body.data.statements: logger.info( f"statement_id: {stmt.statement_id}, result location: {stmt.result_uri}") # 查看结果的示例代码 _bucket = stmt.result_uri.split("oss://")[1].split("/")[0] _dir = stmt.result_uri.replace(f"oss://{_bucket}/", "").replace("//", "/") oss_client = oss2.Bucket(oss2.Auth(client_config.access_key_id, client_config.access_key_secret), f"oss-{_region}.aliyuncs.com", _bucket) list_csv_files(oss_client, _dir) # 查询在Spark Interactive资源组中执行的所有的SQL,可以分页查询 logger.info("List all history query") page_num = 1 no_more_page = list_history_query(_client, _db, _rg_name, page_num) while no_more_page: logger.info(f"List page {page_num}") page_num += 1 no_more_page = list_history_query(_client, _db, _rg_name, page_num)- 参数说明: - _ak:阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。 
- _sk:阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。 
- region:AnalyticDB for MySQL集群所属地域ID。 
- _db:AnalyticDB for MySQL集群ID。 
- _rg_name:Spark Interactive型资源组的名称。 
- oss_location(可选):查询结果文件存储的OSS路径。 - 若不填写该参数,您仅可以在页面,对应SQL查询语句的日志中,查看到5行数据。 
 
应用程序
Hive JDBC
- 在pom.xml中配置Maven依赖。 - <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.9</version> </dependency>
- 建立连接并执行Spark SQL。 - public class java { public static void main(String[] args) throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); String url = "<连接地址>"; Connection con = DriverManager.getConnection(url, "<用户名>", "<密码>"); Statement stmt = con.createStatement(); ResultSet tables = stmt.executeQuery("show tables"); List<String> tbls = new ArrayList<>(); while (tables.next()) { System.out.println(tables.getString("tableName")); tbls.add(tables.getString("tableName")); } } }- 参数说明: - 连接地址:准备工作获取的Spark Interactive型资源组JDBC连接串。其中 - default需替换成实际连接的数据库名称。
- 用户名:AnalyticDB for MySQL的数据库账号。 
- 密码:AnalyticDB for MySQL数据库账号的密码。 
 
PyHive
- 安装Python Hive客户端。 - pip install pyhive
- 建立连接并执行Spark SQL。 - from pyhive import hive from TCLIService.ttypes import TOperationState cursor = hive.connect( host='<连接地址>', port=<端口号>, username='<资源组名称>/<用户名>', password='<密码>', auth='CUSTOM' ).cursor() cursor.execute('show tables') status = cursor.poll().operationState while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): logs = cursor.fetch_logs() for message in logs: print(message) # If needed, an asynchronous query can be cancelled at any time with: # cursor.cancel() status = cursor.poll().operationState print(cursor.fetchall())- 参数说明: - 连接地址:准备工作获取的Spark Interactive型资源组连接地址。 
- 端口号:Spark Interactive型资源组的端口号,固定为10000。 
- 资源组名称:Spark Interactive型资源组的名称。 
- 用户名:AnalyticDB for MySQL的数据库账号。 
- 密码:AnalyticDB for MySQL数据库账号的密码。 
 
客户端
除了在本文中详细介绍的Beeline、DBeaver、DBVisualizer、Datagrip客户端外,您还可以在Airflow、Azkaban、DolphinScheduler等工作流调度工具中执行交互式分析。
Beeline
- 连接Spark Interactive型资源组。 - 命令格式如下: - !connect <连接地址> <用户名> <密码>- 连接地址:准备工作获取的Spark Interactive型资源组JDBC连接串。其中 - default需替换成实际连接的数据库名称。
- 用户名:AnalyticDB for MySQL的数据库账号。 
- 密码:AnalyticDB for MySQL数据库账号的密码。 
 - 示例: - !connect jdbc:hive2://amv-bp1c3em7b2e****-spark.ads.aliyuncs.com:10000/adb_test spark_resourcegroup/AdbSpark14**** Spark23****- 返回结果: - Connected to: Spark SQL (version 3.2.0) Driver: Hive JDBC (version 2.3.9) Transaction isolation: TRANSACTION_REPEATABLE_READ
- 执行Spark SQL。 - SHOW TABLES;
DBeaver
- 打开DBeaver客户端,单击。 
- 在连接到数据库页面,选择Apache Spark,单击下一步。 
- 配置Hadoop/Apache Spark 连接设置,参数说明如下: - 参数 - 说明 - 连接方式 - 连接方式选择为URL。 - JDBC URL - 请填写准备工作中获取的JDBC连接串。 重要- 连接串中的 - default需替换为实际的数据库名。- 用户名 - AnalyticDB for MySQL的数据库账号。 - 密码 - AnalyticDB for MySQL数据库账号的密码。 
- 上述参数配置完成后,单击测试连接。 重要- 首次测试连接时,DBeaver会自动获取需要下载的驱动信息,获取完成后,请单击下载,下载相关驱动。 
- 测试连接成功后,单击完成。 
- 在数据库导航页签下,展开对应数据源的子目录,单击对应数据库。 
- 在右侧代码框中输入SQL语句,并单击  按钮运行。 按钮运行。- SHOW TABLES;- 返回结果如下: - +-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | [] | +-----------+-----------+-------------+
DBVisualizer
- 在Driver Manage页面,选择Hive,单击  按钮。 按钮。
- 在Driver Settings页签下,配置如下参数: - 参数 - 说明 - Name - Hive数据源名称,您可以自定义名称。 - URL Format - 请填写准备工作中获取的JDBC连接串。 重要- 连接串中的 - default需替换为实际的数据库名。- Driver Class - Hive驱动,固定选择为org.apache.hive.jdbc.HiveDriver。 重要- 参数配置完成后,请单击Start Download,下载对应驱动。 
- 驱动下载完成后,单击。 
- 在Create Database Connection from Database URL对话框中填写以下参数: - 参数 - 说明 - Database URL - 请填写准备工作中获取的JDBC连接串。 重要- 连接串中的 - default需替换为实际的数据库名。- Driver Class - 选择步骤3创建的Hive数据源。 
- 在Connection页面配置以下连接参数,并单击Connect。 - 参数 - 说明 - Name - 默认与步骤3创建的Hive数据源同名,您可以自定义名称。 - Notes - 备注信息。 - Driver Type - 选择Hive。 - Database URL - 请填写准备工作中获取的JDBC连接串。 重要- 连接串中的 - default需替换为实际的数据库名。- Database Userid - AnalyticDB for MySQL的数据库账号。 - Database Password - AnalyticDB for MySQL数据库账号的密码。 说明- 其他参数无需配置,使用默认值即可。 
- 连接成功后,在Database页签下,展开对应数据源的子目录,单击对应数据库。 
- 在右侧代码框中输入SQL语句,并单击  按钮运行。 按钮运行。- SHOW TABLES;- 返回结果如下: - +-----------+-----------+-------------+ | namespace | tableName | isTemporary | +-----------+-----------+-------------+ | db | test | false | +-----------+-----------+-------------+
Datagrip
- 打开Datagrip客户端,单击,创建项目。 
- 添加数据源。 - 单击  按钮,选择。 按钮,选择。
- 在弹出的Data Sources and Drivers对话框中配置如下参数后,单击OK。  - 参数 - 说明 - Name - 数据源名称,您可以自定义。本文示例为 - adbtest。- Host - 请填写准备工作中获取的JDBC连接串。 重要- 连接串中的 - default需替换为实际的数据库名。- Port - Spark Interactive型资源组的端口号,固定为10000。 - User - AnalyticDB for MySQL的数据库账号。 - Password - AnalyticDB for MySQL数据库账号的密码。 - Schema - AnalyticDB for MySQL集群的数据库名称。 
 
- 执行Spark SQL。 - 在数据源列表中,右击步骤2创建的数据源,选择。 
- 在右侧Console面板中执行Spark SQL。 - SHOW TABLES;