Kyuubi Gateway通过提供JDBC/ODBC接口,支持SQL查询和BI工具(如Tableau、Power BI)无缝连接Serverless Spark,实现高效的数据访问与分析。同时,其多租户资源隔离能力能够满足企业级应用的需求。
创建Kyuubi Gateway
进入Gateway页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的
。
在Kyuubi Gateway页面,单击创建Kyuubi Gateway。
在创建Kyuubi Gateway页面,配置以下信息,单击创建。
参数
说明
名称
新建Gateway的名称。仅支持小写字母、数字、短划线(-),并且开头和结尾必须是字母或者数字。
Kyuubi Gateway 资源
默认
2 CPU, 8 GB
。支持的规格及其推荐并发上限如下:
1 CPU, 4 GB
:102 CPU, 8 GB
:204 CPU, 16 GB
:308 CPU, 32 GB
:4516 CPU, 64 GB
:8532 CPU, 128 GB
:135
说明如果Spark配置项数量过多,将导致Spark任务的瞬时提交并发度下降。
Kyuubi 版本
当前Gateway使用的Kyuubi版本。
说明如果数据目录中使用DLF(之前称为DLF 2.5),则Kyuubi 版本必须选择为1.9.2-0.0.1及之后的版本。
引擎版本
当前Gateway使用的引擎版本。引擎版本号含义等详情请参见引擎版本介绍。
关联队列
创建的Gateway将部署在所选队列。通过Gateway提交Spark任务时,将使用Gateway创建者的身份提交任务。
认证方式
仅支持Token方式。
在您创建Gateway之后,需要为其生成一个唯一的鉴权Token,以便在之后的请求中使用该Token进行身份验证和访问控制。创建Token的具体操作,请参见Gateway管理。
服务高可用
开启服务高可用后,将部署3台或以上Kyuubi Server以达到高可用。
打开该开关后,还需配置以下参数:
Kyuubi Server 部署数量:Kyuubi服务器数量。
Zookeeper 集群地址:高可用Kyuubi Gateway依赖于Zookeeper集群,请填写Zookeeper集群地址,多个节点请以英文逗号(,)分隔,并确保网络互通。例如,
zk1:2181,zk2:2181,zk3:2181
。
网络连接
选择已创建的网络连接,以便直接访问VPC内的数据源或外部服务。有关创建网络连接的具体操作,请参见EMR Serverless Spark与其他VPC间网络互通。
公网 Endpoint
默认关闭。开启该功能后,则系统将通过公网Endpoint访问Kyuubi。否则默认通过内网Endpoint访问Kyuubi。
Kyuubi 配置
填写Kyuubi配置信息,默认以空格符分隔,例如:
kyuubi.engine.pool.size 1
。仅支持以下Kyuubi配置。
kyuubi.engine.pool.size kyuubi.engine.pool.size.threshold kyuubi.engine.share.level kyuubi.engine.single.spark.session kyuubi.session.engine.idle.timeout kyuubi.session.engine.initialize.timeout kyuubi.engine.security.token.max.lifetime kyuubi.session.engine.check.interval kyuubi.session.idle.timeout kyuubi.session.engine.request.timeout kyuubi.session.engine.login.timeout kyuubi.backend.engine.exec.pool.shutdown.timeout kyuubi.backend.server.exec.pool.shutdown.timeout kyuubi.backend.server.exec.pool.keepalive.time kyuubi.frontend.thrift.login.timeout kyuubi.operation.status.polling.timeout kyuubi.engine.pool.selectPolicy kyuubi.authentication kyuubi.kinit.principal kyuubi.kinit.keytab kyuubi.authentication.ldap.* kyuubi.hadoop.proxyuser.hive.hosts kyuubi.hadoop.proxyuser.hive.groups kyuubi.hadoop.proxyuser.kyuubi.hosts kyuubi.hadoop.proxyuser.kyuubi.groups kyuubi.ha.*
Spark 配置
填写Spark配置信息,默认以空格符分隔。除
spark.kubernetes.*
类型的参数外,其他参数均支持。例如:spark.sql.catalog.paimon.metastore dlf
。在Kyuubi Gateway页面,单击已创建Kyuubi Gateway操作列的启动。
管理Token
在Kyuubi Gateway页面,单击目标Gateway操作列的Token管理。
单击创建Token。
在创建Token对话框中,配置以下信息,单击确定。
参数
说明
名称
新建Token的名称。
过期时间
设置该Token的过期时间。设置的天数应大于或等于1。默认情况下为开启状态,365天后过期。
分配对象
在下拉框中,选择您在访问控制中添加的RAM用户或者RAM角色。
指定分配Token的RAM用户或RAM角色,用于在连接Kyuubi Gateway提交Spark任务时访问DLF。
复制Token信息。
重要Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。
连接Kyuubi Gateway
在连接Kyuubi Gateway时,请根据您的实际情况替换JDBC URL中的信息:
<endpoint>
:您在总览页签获取的Endpoint信息。<port>
:端口号。使用外网域名访问时端口号为443,使用内网域名访问时端口号为80。<token>
:您在Token管理页签复制的Token信息。<tokenname>
:Token名称。您可以在Token管理页签获取。<UserName/RoleName>
:您在访问控制中添加的RAM用户或RAM角色。
使用Beeline连接
在连接Kyuubi Gateway时,请确保您使用的Beeline版本与Kyuubi服务端版本兼容。如果您未安装Beeline,详情请参见Getting Started - Apache Kyuubi。
请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。
使用DLF(之前称为DLF 2.5)
beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;user=<UserName/RoleName>;httpPath=cliservice/token/<token>"
使用其他Catalog
beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"
使用Beeline连接时支持修改会话参数,例如beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>;#spark.sql.shuffle.partitions=100;spark.executor.instances=2;"
。
使用Java连接
更新pom.xml。
使用适当版本的依赖项以替换
hadoop-common
和hive-jdbc
。<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.9</version> </dependency> </dependencies>
编写Java代码,连接Kyuubi Gateway。
请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。
使用DLF(之前称为DLF 2.5)
import org.apache.hive.jdbc.HiveStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>;user=<UserName/RoleName>"; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection(url); HiveStatement stmt = (HiveStatement) conn.createStatement(); String sql = "select * from students;"; System.out.println("Running " + sql); ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
使用其他Catalog
import org.apache.hive.jdbc.HiveStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection(url); HiveStatement stmt = (HiveStatement) conn.createStatement(); String sql = "select * from students;"; System.out.println("Running " + sql); ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
使用Python连接
执行以下命令,安装PyHive和Thrift包。
pip3 install pyhive thrift
编写Python脚本,连接Kyuubi Gateway。
以下是一个Python脚本示例,展示如何连接到Kyuubi Gateway并显示数据库列表。
请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。
使用DLF(之前称为DLF 2.5)
from pyhive import hive if __name__ == '__main__': cursor = hive.connect('<endpoint>', port="<port>", scheme='http', username='<UserName/RoleName>', password='<token>').cursor() cursor.execute('show databases') print(cursor.fetchall()) cursor.close()
使用其他Catalog
from pyhive import hive if __name__ == '__main__': cursor = hive.connect('<endpoint>', port="<port>", scheme='http', username='<tokenname>', password='<token>').cursor() cursor.execute('show databases') print(cursor.fetchall()) cursor.close()
使用REST API连接
Kyuubi Gateway提供了开源兼容的REST API,支持通过HTTP协议与Kyuubi服务进行交互。目前仅支持以下API路径:
/api/v1/sessions/*
/api/v1/operations/*
/api/v1/batches/*
本文通过以下示例为您介绍如何使用REST API连接Kyuubi Gateway。
示例1:启动Session并进行SQL查询。
创建Session并指定Spark配置。
请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。
说明spark.emr.serverless.kyuubi.engine.queue
用于指定Spark任务运行时所使用的队列。请根据实际情况替换<dev_queue>
为具体的队列名。<UserName/Rolename>
:替换为实际的用户名或角色名。<password>
:仅作为占位符,可填写任意值。
使用DLF(之前称为DLF 2.5)
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -u '<UserName/Rolename>:<password>' \ -d '{ "configs": { "set:hivevar:spark.emr.serverless.kyuubi.engine.queue": "<dev_queue>" } }'
使用其他Catalog
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "configs": { "set:hivevar:spark.emr.serverless.kyuubi.engine.queue": "<dev_queue>" } }'
返回如下类似信息。其中,
identifier
表示Kyuubi的Session Handle,用于唯一标识一个会话。本文中将以<sessionHandle>
指代该值。{"identifier":"619e6ded-xxxx-xxxx-xxxx-c2a43f6fac46","kyuubiInstance":"0.0.0.0:10099"}
创建Statement。
使用DLF(之前称为DLF 2.5)
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/<sessionHandle>/operations/statement/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -u '<UserName/RoleName>:<password>' \ -d '{ "statement": "select * from test;", "runAsync": true, "queryTimeout": 0, "confOverlay": { "additionalProp1": "string", "additionalProp2": "string" } }'
使用其他Catalog
curl -X 'POST' \ 'http://<endpoint>:<port>/api/v1/sessions/<sessionHandle>/operations/statement/token/<token>' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "statement": "select * from test;", "runAsync": true, "queryTimeout": 0, "confOverlay": { "additionalProp1": "string", "additionalProp2": "string" } }'
返回如下类似信息。这里的
identifier
表示Kyuubi的Operation Handle,用于唯一标识一个具体的操作。本文中将以<operationHandle>
指代该值。{"identifier":"a743e8ff-xxxx-xxxx-xxxx-a66fec66cfa4"}
获取Statement状态。
使用DLF(之前称为DLF 2.5)
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/event/token/<token>' \ -H 'accept: application/json' \ -u '<UserName/RoleName>:<password>'
使用其他Catalog
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/event/token/<token>' \ -H 'accept: application/json'
获取Statement结果。
使用DLF(之前称为DLF 2.5)
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/rowset/token/<token>/?maxrows=100&fetchorientation=FETCH_NEXT' \ -H 'accept: application/json' \ -u '<UserName/RoleName>:<password>'
使用其他Catalog
curl --location -X 'GET' \ 'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/rowset/token/<token>/?maxrows=100&fetchorientation=FETCH_NEXT' \ -H 'accept: application/json'
示例2:使用batches接口提交批作业。
通过REST API提交一个Spark批处理任务到Kyuubi Gateway。Kyuubi Gateway会根据请求中的参数启动一个Spark应用程序,并执行指定的任务。
在本示例中,除过替换
<endpoint>
、<port>
、<token>
等信息外,还需单击spark-examples_2.12-3.3.1.jar,直接下载测试JAR包。说明该JAR包是Spark自带的一个简单示例,用于计算圆周率π的值。
使用DLF(之前称为DLF 2.5)
curl --location \ --request POST 'http://<endpoint>:<port>/api/v1/batches/token/<token>' \ --user '<UserName/RoleName>:<password>' \ --form 'batchRequest="{ \"batchType\": \"SPARK\", \"className\": \"org.apache.spark.examples.SparkPi\", \"name\": \"kyuubi-spark-pi\", \"resource\": \"oss://bucket/path/to/spark-examples_2.12-3.3.1.jar\" }";type=application/json'
使用其他Catalog
curl --location \ --request POST 'http://<endpoint>:<port>/api/v1/batches/token/<token>' \ --form 'batchRequest="{ \"batchType\": \"SPARK\", \"className\": \"org.apache.spark.examples.SparkPi\", \"name\": \"kyuubi-spark-pi\", \"resource\": \"oss://bucket/path/to/spark-examples_2.12-3.3.1.jar\" }";type=application/json'
配置并连接高可用Kyuubi Gateway
打通网络连接。
参见EMR Serverless Spark与其他VPC间网络互通打通网络连接,确保您的客户端能够访问目标VPC内的Zookeeper集群。例如,阿里云MSE或EMR on ECS的Zookeeper组件。
启用Kyuubi Gateway的高可用。
在创建或者编辑Kyuubi Gateway时,开启了服务高可用并配置了相关参数,并且网络连接选择了已打通的网络连接。
连接高可用Kyuubi Gateway。
完成上述配置后,Kyuubi Gateway将通过Zookeeper实现高可用。您可以通过REST API或JDBC连接来验证其可用性。
在连接Kyuubi Gateway时,请根据您的实际情况替换JDBC URL中的信息:
<endpoint>
:您在总览页签获取的Endpoint信息。<port>
:端口号。使用外网域名访问时端口号为443,使用内网域名访问时端口号为80。<token>
:您在Token管理页签复制的Token信息。<tokenname>
:Token名称。您可以在Token管理页签获取。<UserName/RoleName>
:您在访问控制中添加的RAM用户或RAM角色。
通过以下示例为您介绍如何连接高可用Kyuubi Gateway。
使用Beeline连接
单击kyuubi-hive-jdbc-1.9.2.jar,下载JDBC Driver JAR。
替换JDBC Driver JAR。
备份并移动原有的JDBC Driver JAR包。
mv /your_path/apache-kyuubi-1.9.2-bin/beeline-jars /bak_path
说明如果您使用的是EMR on ECS,则Kyuubi的默认路径为
/opt/apps/KYUUBI/kyuubi-1.9.2-1.0.0/beeline-jars
。如果不确定Kyuubi的安装路径,可以通过env | grep KYUUBI_HOME
命令查找。替换为新的JDBC Driver JAR包。
cp /download/serverless-spark-kyuubi-hive-jdbc-1.9.2.jar /your_path/apache-kyuubi-1.9.2-bin/beeline-jars
使用Beeline连接。
/your_path/apache-kyuubi-1.9.2-bin/bin/beeline -u 'jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>'
使用Java连接
单击serverless-spark-kyuubi-hive-jdbc-shaded-1.9.2.jar,下载shaded包。
安装JDBC Driver到Maven仓库。
执行以下命令,将Serverless Spark提供的JDBC Driver安装到本地Maven仓库。
mvn install:install-file \ -Dfile=/download/serverless-spark-kyuubi-hive-jdbc-shaded-1.9.2.jar \ -DgroupId=org.apache.kyuubi \ -DartifactId=kyuubi-hive-jdbc-shaded \ -Dversion=1.9.2-ss \ -Dpackaging=jar
修改
pom.xml
文件。在项目的
pom.xml
中添加以下依赖。<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.kyuubi</groupId> <artifactId>kyuubi-hive-jdbc-shaded</artifactId> <version>1.9.2-ss</version> </dependency> </dependencies>
编写Java示例代码。
import org.apache.kyuubi.jdbc.hive.KyuubiStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"; Class.forName("org.apache.kyuubi.jdbc.KyuubiHiveDriver"); Connection conn = DriverManager.getConnection(url); KyuubiStatement stmt = (KyuubiStatement) conn.createStatement(); String sql = "select * from test;"; ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
查看Kyuubi提交的Spark任务列表
通过Kyuubi提交的Spark任务,您可以在Application 列表页签中查看详细的任务信息,包括应用ID、应用名称、状态以及启动时间等。这些信息可以帮助您快速了解和管理Kyuubi提交的Spark任务。
在Kyuubi Gateway页面,单击目标Kyuubi Gateway的名称。
单击Application 列表页签。
在该页面,您可以查看通过该Kyuubi提交的所有Spark任务的详细信息。其中,应用ID(spark-xxxx)由Spark引擎生成,与Kyuubi客户端连接时输出的Application ID完全一致,用于唯一标识任务实例。