Spark Thrift Server是Apache Spark提供的一种服务,支持通过JDBC或ODBC连接并执行SQL查询,从而便捷地将Spark环境与现有的商业智能(BI)工具、数据可视化工具及其他数据分析工具集成。本文主要为您介绍如何创建并连接Spark Thrift Server。
前提条件
已创建工作空间,详情请参见管理工作空间。
创建Spark Thrift Server会话
Spark Thrift Server创建完成后,您可以在创建Spark SQL类型任务时选择此会话。
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在会话管理页面,单击Spark Thrift Server页签。
单击创建Spark Thrift Server。
在创建Spark Thrift Server页面,配置以下信息,单击创建。
参数
说明
名称
新建Spark Thrift Server的名称。
长度限制为1~64个字符,仅支持字母、数字、短划线(-)、下划线(_)和空格。
部署队列
请选择合适的开发队列部署会话。仅支持选择开发或者开发和生产公用的队列。
队列更多信息,请参见管理资源队列。
引擎版本
当前会话使用的引擎版本。引擎版本号含义等详情请参见引擎版本介绍。
使用Fusion加速
Fusion可加速Spark负载的运行并降低任务的总成本。有关计费信息,请参见产品计费。有关Fusion引擎介绍,请参见Fusion引擎。
自动停止
默认开启。45分钟不活动后自动停止Spark Thrift Server会话。
Spark Thrift Server端口
默认443端口。
认证方式
仅支持Token方式。
spark.driver.cores
用于指定Spark应用程序中Driver进程所使用的CPU核心数量。默认值为1 CPU。
spark.driver.memory
用于指定Spark应用程序中Driver进程可以使用的内存量。默认值为3.5 GB。
spark.executor.cores
用于指定每个Executor进程可以使用的CPU核心数量。默认值为1 CPU。
spark.executor.memory
用于指定每个Executor进程可以使用的内存量。默认值为3.5 GB。
spark.executor.instances
Spark分配的执行器(Executor)数量。默认值为2。
动态资源分配
默认关闭。开启后,需要配置以下参数:
executors数量下限:默认为2。
executors数量上限:如果未设置spark.executor.instances,则默认值为10。
更多内存配置
spark.driver.memoryOverhead:每个Driver可利用的非堆内存。如果未设置该参数,Spark会根据默认值自动分配,默认值为
max(384MB, 10% × spark.driver.memory)
。spark.executor.memoryOverhead:每个Executor可利用的非堆内存。如果未设置该参数,Spark会根据默认值自动分配,默认值为
max(384MB, 10% × spark.executor.memory)
。spark.memory.offHeap.size:Spark可用的堆外内存大小。默认值为1 GB。
仅在
spark.memory.offHeap.enabled
设置为true
时生效。默认情况下,当采用Fusion Engine时,该功能将处于启用状态,其非堆内存默认设置为1 GB。
Spark配置
填写Spark配置信息,默认以空格符分隔,例如,
spark.sql.catalog.paimon.metastore dlf
。获取Endpoint信息。
在Spark Thrift Server页签,单击新增的Spark Thrift Server的名称。
在总览页签,复制Endpoint信息。
创建Token
Token使用时,请在请求的header中添加--header `x-acs-spark-livy-token: token`
。
在Spark Thrift Server页签,单击新增的Spark Thrift Server的名称。
单击Token管理页签。
单击创建Token。
在创建Token对话框中,配置以下信息,单击确定。
参数
说明
名称
新建Token的名称。
过期时间
设置该Token的过期时间。设置的天数应大于或等于1。默认情况下为开启状态,365天后过期。
复制Token信息。
重要Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。
连接Spark Thrift Server
在连接Spark Thrift Server时,请根据您的实际情况替换以下信息:
<endpoint>
:您在总览页签获取的Endpoint信息。<username>
:您在Token管理页签新建的Token的名称。<token>
:您在Token管理页签复制的Token信息。
使用Python连接Spark Thrift Server
执行以下命令,安装PyHive和Thrift包。
pip install pyhive thrift
编写Python脚本,连接Spark Thrift Server。
以下是一个Python脚本示例,展示如何连接到Hive并显示数据库列表。
from pyhive import hive if __name__ == '__main__': # 替换<endpoint>, <username>, 和 <token> 为您的实际信息。 cursor = hive.connect('<endpoint>', port=443, scheme='https', username='<username>', password='<token>').cursor() cursor.execute('show databases') print(cursor.fetchall()) cursor.close()
使用Java连接Spark Thrift Server
请在您的
pom.xml
中引入以下Maven依赖。<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> </dependencies>
说明当前Serverless Spark内置的Hive版本为2.x,因此仅支持hive-jdbc 2.x版本。
编写Java代码,连接Spark Thrift Server。
以下是一个Sample Java代码,用于连接到Spark Thrift Server,并查询数据库列表。
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice;user=<username>;password=<token>"; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection(url); HiveStatement stmt = (HiveStatement) conn.createStatement(); String sql = "show databases"; System.out.println("Running " + sql); ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
通过Spark Beeline连接Spark Thrift Server
(可选)如果您使用的是EMR on ECS的集群,建议您先进入Spark的
bin
目录。cd /opt/apps/SPARK3/spark-3.4.2-hadoop3.2-1.0.3/bin/
使用Beeline客户端连接到Spark Thrift Server。
beeline -u "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice;user=<username>;password=<token>"
如果连接Serverless Spark Thrift Server时出现以下报错,通常是由于Hive Beeline版本不兼容导致的。请确保您使用的是Spark Beeline。
24/08/22 15:09:11 [main]: ERROR jdbc.HiveConnection: Error opening session org.apache.thrift.transport.TTransportException: HTTP Response code: 404
配置Apache Superset以连接Spark Thrift Server
Apache Superset是一个现代数据探索和可视化平台,具有丰富的从简单的折线图到高度详细的地理空间图表的图表形态。更多Superset信息,请参见Superset。
安装依赖。
请确保您已经安装了高版本的
thrift
包(建议高于16.0.0)。如未安装,您可以使用以下命令安装。pip install thrift==20.0.0
启动Superset,进入Superset界面。
更多启动操作信息,请参见Superset文档。
在页面右上角单击DATABASE,进入Connect a database页面。
在Connect a database页面,选择Apache Spark SQL。
填写连接字符串,然后配置相关数据源参数。
hive+https://<username>:<token>@<endpoint>:443/<db_name>
单击FINISH,以确认成功连接和验证。
配置Hue以连接Spark Thrift Server
Hue是一个流行的开源Web界面,可用于与Hadoop生态系统进行交互。关于Hue的更多介绍,请参见Hue官方文档。
安装依赖。
请确保您已经安装了高版本的
thrift
包(建议高于16.0.0)。如未安装,您可以使用以下命令安装。pip install thrift==20.0.0
在Hue的配置文件中添加Spark SQL连接串。
请找到Hue的配置文件(通常位于
/etc/hue/hue.conf
),并在文件中添加以下内容。[[[sparksql]]] name = Spark Sql interface=sqlalchemy options='{"url": "hive+https://<username>:<token>@<endpoint>:443/"}'
重启Hue。
修改配置后,您需要执行以下命令重启Hue服务以使更改生效。
sudo service hue restart
验证连接。
成功重启后,访问Hue界面,找到Spark SQL选项。如果配置正确,您应能够成功连接到Spark Thrift Server并执行SQL查询。
相关文档
Fusion引擎更多介绍,请参见Fusion引擎。