管理Kyuubi Gateway

Kyuubi Gateway通过提供JDBC/ODBC接口,支持SQL查询和BI工具(如Tableau、Power BI)无缝连接Serverless Spark,实现高效的数据访问与分析。同时,其多租户资源隔离能力能够满足企业级应用的需求。

创建Kyuubi Gateway

  1. 进入Gateway页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的运维中心 > Gateway

  2. Kyuubi Gateway页面,单击创建Kyuubi Gateway

  3. 在创建Kyuubi Gateway页面,配置以下信息,单击创建

    参数

    说明

    名称

    新建Gateway的名称。仅支持小写字母、数字、短划线(-),并且开头和结尾必须是字母或者数字。

    Kyuubi Gateway 资源

    默认2 CPU, 8 GB

    支持的规格及其推荐并发上限如下:

    • 1 CPU, 4 GB:10

    • 2 CPU, 8 GB:20

    • 4 CPU, 16 GB:30

    • 8 CPU, 32 GB:45

    • 16 CPU, 64 GB:85

    • 32 CPU, 128 GB:135

    说明

    如果Spark配置项数量过多,将导致Spark任务的瞬时提交并发度下降。

    Kyuubi 版本

    当前Gateway使用的Kyuubi版本。

    说明

    如果数据目录中使用DLF(之前称为DLF 2.5),则Kyuubi 版本必须选择为1.9.2-0.0.1及之后的版本。

    引擎版本

    当前Gateway使用的引擎版本。引擎版本号含义等详情请参见引擎版本介绍

    关联队列

    创建的Gateway将部署在所选队列。通过Gateway提交Spark任务时,将使用Gateway创建者的身份提交任务。

    认证方式

    仅支持Token方式。

    在您创建Gateway之后,需要为其生成一个唯一的鉴权Token,以便在之后的请求中使用该Token进行身份验证和访问控制。创建Token的具体操作,请参见Gateway管理

    服务高可用

    开启服务高可用后,将部署3台或以上Kyuubi Server以达到高可用。

    打开该开关后,还需配置以下参数:

    • Kyuubi Server 部署数量:Kyuubi服务器数量。

    • Zookeeper 集群地址:高可用Kyuubi Gateway依赖于Zookeeper集群,请填写Zookeeper集群地址,多个节点请以英文逗号(,)分隔,并确保网络互通。例如,zk1:2181,zk2:2181,zk3:2181

    网络连接

    选择已创建的网络连接,以便直接访问VPC内的数据源或外部服务。有关创建网络连接的具体操作,请参见EMR Serverless Spark与其他VPC间网络互通

    公网 Endpoint

    默认关闭。开启该功能后,则系统将通过公网Endpoint访问Kyuubi。否则默认通过内网Endpoint访问Kyuubi。

    Kyuubi 配置

    填写Kyuubi配置信息,默认以空格符分隔,例如:kyuubi.engine.pool.size 1

    仅支持以下Kyuubi配置。

    kyuubi.engine.pool.size
    kyuubi.engine.pool.size.threshold
    kyuubi.engine.share.level
    kyuubi.engine.single.spark.session
    kyuubi.session.engine.idle.timeout
    kyuubi.session.engine.initialize.timeout
    kyuubi.engine.security.token.max.lifetime
    kyuubi.session.engine.check.interval
    kyuubi.session.idle.timeout
    kyuubi.session.engine.request.timeout
    kyuubi.session.engine.login.timeout
    kyuubi.backend.engine.exec.pool.shutdown.timeout
    kyuubi.backend.server.exec.pool.shutdown.timeout
    kyuubi.backend.server.exec.pool.keepalive.time
    kyuubi.frontend.thrift.login.timeout
    kyuubi.operation.status.polling.timeout
    kyuubi.engine.pool.selectPolicy
    kyuubi.authentication
    kyuubi.kinit.principal
    kyuubi.kinit.keytab
    kyuubi.authentication.ldap.*
    kyuubi.hadoop.proxyuser.hive.hosts
    kyuubi.hadoop.proxyuser.hive.groups
    kyuubi.hadoop.proxyuser.kyuubi.hosts
    kyuubi.hadoop.proxyuser.kyuubi.groups
    kyuubi.ha.*

    Spark 配置

    填写Spark配置信息,默认以空格符分隔。除spark.kubernetes.*类型的参数外,其他参数均支持。例如:spark.sql.catalog.paimon.metastore dlf

  4. Kyuubi Gateway页面,单击已创建Kyuubi Gateway操作列的启动

管理Token

  1. Kyuubi Gateway页面,单击目标Gateway操作列的Token管理

  2. 单击创建Token

  3. 创建Token对话框中,配置以下信息,单击确定

    参数

    说明

    名称

    新建Token的名称。

    过期时间

    设置该Token的过期时间。设置的天数应大于或等于1。默认情况下为开启状态,365天后过期。

    分配对象

    说明
    • 如果您在数据目录页签,默认使用的是DLF(之前称为DLF 2.5),则需要配置该参数。

    • 需确保配置的RAM用户或RAM角拥有访问DLF的权限,授权操作请参见新增授权

    在下拉框中,选择您在访问控制中添加的RAM用户或者RAM角色。

    指定分配TokenRAM用户或RAM角色,用于在连接Kyuubi Gateway提交Spark任务时访问DLF。

  4. 复制Token信息。

    重要

    Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。

连接Kyuubi Gateway

在连接Kyuubi Gateway时,请根据您的实际情况替换JDBC URL中的信息:

  • <endpoint>:您在总览页签获取的Endpoint信息。

  • <port>:端口号。使用外网域名访问时端口号为443,使用内网域名访问时端口号为80。

  • <token>:您在Token管理页签复制的Token信息。

  • <tokenname>:Token名称。您可以在Token管理页签获取。

  • <UserName/RoleName>:您在访问控制中添加的RAM用户或RAM角色。

使用Beeline连接

在连接Kyuubi Gateway时,请确保您使用的Beeline版本与Kyuubi服务端版本兼容。如果您未安装Beeline,详情请参见Getting Started - Apache Kyuubi

请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。

使用DLF(之前称为DLF 2.5)

beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;user=<UserName/RoleName>;httpPath=cliservice/token/<token>"

使用其他Catalog

beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>"

使用Beeline连接时支持修改会话参数,例如beeline -u "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>;#spark.sql.shuffle.partitions=100;spark.executor.instances=2;"

使用Java连接

  • 更新pom.xml。

    使用适当版本的依赖项以替换hadoop-commonhive-jdbc

    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>2.3.9</version>
            </dependency>
        </dependencies>
  • 编写Java代码,连接Kyuubi Gateway。

    请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。

    使用DLF(之前称为DLF 2.5)

    import org.apache.hive.jdbc.HiveStatement;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>;user=<UserName/RoleName>";
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection conn = DriverManager.getConnection(url);
            HiveStatement stmt = (HiveStatement) conn.createStatement();
    
    
            String sql = "select * from students;";
            System.out.println("Running " + sql);
            ResultSet res = stmt.executeQuery(sql);
    
            ResultSetMetaData md = res.getMetaData();
            String[] columns = new String[md.getColumnCount()];
            for (int i = 0; i < columns.length; i++) {
                columns[i] = md.getColumnName(i + 1);
            }
            while (res.next()) {
                System.out.print("Row " + res.getRow() + "=[");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        System.out.print(", ");
                    }
                    System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'");
                }
                System.out.println(")]");
            }
    
            conn.close();
        }
    }

    使用其他Catalog

    import org.apache.hive.jdbc.HiveStatement;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>";
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection conn = DriverManager.getConnection(url);
            HiveStatement stmt = (HiveStatement) conn.createStatement();
    
    
            String sql = "select * from students;";
            System.out.println("Running " + sql);
            ResultSet res = stmt.executeQuery(sql);
    
            ResultSetMetaData md = res.getMetaData();
            String[] columns = new String[md.getColumnCount()];
            for (int i = 0; i < columns.length; i++) {
                columns[i] = md.getColumnName(i + 1);
            }
            while (res.next()) {
                System.out.print("Row " + res.getRow() + "=[");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        System.out.print(", ");
                    }
                    System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'");
                }
                System.out.println(")]");
            }
    
            conn.close();
        }
    }

使用Python连接

  1. 执行以下命令,安装PyHiveThrift包。

    pip3 install pyhive thrift
  2. 编写Python脚本,连接Kyuubi Gateway。

    以下是一个Python脚本示例,展示如何连接到Kyuubi Gateway并显示数据库列表。

    请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。

    使用DLF(之前称为DLF 2.5)

    from pyhive import hive
    
    if __name__ == '__main__':
        cursor = hive.connect('<endpoint>',
                              port="<port>",
                              scheme='http',
                              username='<UserName/RoleName>',
                              password='<token>').cursor()
        cursor.execute('show databases')
        print(cursor.fetchall())
        cursor.close()

    使用其他Catalog

    from pyhive import hive
    
    if __name__ == '__main__':
        cursor = hive.connect('<endpoint>',
                              port="<port>",
                              scheme='http',
                              username='<tokenname>',
                              password='<token>').cursor()
        cursor.execute('show databases')
        print(cursor.fetchall())
        cursor.close()

使用REST API连接

Kyuubi Gateway提供了开源兼容的REST API,支持通过HTTP协议与Kyuubi服务进行交互。目前仅支持以下API路径:

  • /api/v1/sessions/*

  • /api/v1/operations/*

  • /api/v1/batches/*

本文通过以下示例为您介绍如何使用REST API连接Kyuubi Gateway

  • 示例1:启动Session并进行SQL查询。

    1. 创建Session并指定Spark配置。

      请根据数据目录页面Catalog的默认使用情况,选择以下对应的操作方式。

      说明
      • spark.emr.serverless.kyuubi.engine.queue用于指定Spark任务运行时所使用的队列。请根据实际情况替换<dev_queue>为具体的队列名。

      • <UserName/Rolename>:替换为实际的用户名或角色名。

      • <password>:仅作为占位符,可填写任意值。

      使用DLF(之前称为DLF 2.5)

      curl -X 'POST' \
        'http://<endpoint>:<port>/api/v1/sessions/token/<token>' \
        -H 'accept: application/json' \
        -H 'Content-Type: application/json' \
        -u '<UserName/Rolename>:<password>' \
        -d '{
        "configs": {
          "set:hivevar:spark.emr.serverless.kyuubi.engine.queue": "<dev_queue>"
        }
      }'

      使用其他Catalog

      curl -X 'POST' \
        'http://<endpoint>:<port>/api/v1/sessions/token/<token>' \
        -H 'accept: application/json' \
        -H 'Content-Type: application/json' \
        -d '{
        "configs": {
          "set:hivevar:spark.emr.serverless.kyuubi.engine.queue": "<dev_queue>"
        }
      }'

      返回如下类似信息。其中,identifier表示KyuubiSession Handle,用于唯一标识一个会话。本文中将以<sessionHandle>指代该值。

      {"identifier":"619e6ded-xxxx-xxxx-xxxx-c2a43f6fac46","kyuubiInstance":"0.0.0.0:10099"}
    2. 创建Statement。

      使用DLF(之前称为DLF 2.5)

      curl -X 'POST' \
        'http://<endpoint>:<port>/api/v1/sessions/<sessionHandle>/operations/statement/token/<token>' \
        -H 'accept: application/json' \
        -H 'Content-Type: application/json' \
        -u '<UserName/RoleName>:<password>' \
        -d '{
        "statement": "select * from test;",
        "runAsync": true,
        "queryTimeout": 0,
        "confOverlay": {
          "additionalProp1": "string",
          "additionalProp2": "string"
        }
      }'

      使用其他Catalog

      curl -X 'POST' \
        'http://<endpoint>:<port>/api/v1/sessions/<sessionHandle>/operations/statement/token/<token>' \
        -H 'accept: application/json' \
        -H 'Content-Type: application/json' \
        -d '{
        "statement": "select * from test;",
        "runAsync": true,
        "queryTimeout": 0,
        "confOverlay": {
          "additionalProp1": "string",
          "additionalProp2": "string"
        }
      }'

      返回如下类似信息。这里的identifier表示KyuubiOperation Handle,用于唯一标识一个具体的操作。本文中将以<operationHandle>指代该值。

      {"identifier":"a743e8ff-xxxx-xxxx-xxxx-a66fec66cfa4"}
    3. 获取Statement状态。

      使用DLF(之前称为DLF 2.5)

      curl --location -X 'GET' \
        'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/event/token/<token>' \
        -H 'accept: application/json' \
        -u '<UserName/RoleName>:<password>'

      使用其他Catalog

      curl --location -X 'GET' \
        'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/event/token/<token>' \
        -H 'accept: application/json'
    4. 获取Statement结果。

      使用DLF(之前称为DLF 2.5)

      curl --location -X 'GET' \
        'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/rowset/token/<token>/?maxrows=100&fetchorientation=FETCH_NEXT' \
        -H 'accept: application/json'  \
        -u '<UserName/RoleName>:<password>'

      使用其他Catalog

      curl --location -X 'GET' \
        'http://<endpoint>:<port>/api/v1/operations/<operationHandle>/rowset/token/<token>/?maxrows=100&fetchorientation=FETCH_NEXT' \
        -H 'accept: application/json'
  • 示例2:使用batches接口提交批作业。

    通过REST API提交一个Spark批处理任务到Kyuubi Gateway。Kyuubi Gateway会根据请求中的参数启动一个Spark应用程序,并执行指定的任务。

    在本示例中,除过替换<endpoint><port><token>等信息外,还需单击spark-examples_2.12-3.3.1.jar,直接下载测试JAR包。

    说明

    JAR包是Spark自带的一个简单示例,用于计算圆周率π的值。

    使用DLF(之前称为DLF 2.5)

    curl --location \
      --request POST 'http://<endpoint>:<port>/api/v1/batches/token/<token>' \
      --user '<UserName/RoleName>:<password>' \
      --form 'batchRequest="{
        \"batchType\": \"SPARK\",
        \"className\": \"org.apache.spark.examples.SparkPi\",
        \"name\": \"kyuubi-spark-pi\",
        \"resource\": \"oss://bucket/path/to/spark-examples_2.12-3.3.1.jar\"
      }";type=application/json'

    使用其他Catalog

    curl --location \
      --request POST 'http://<endpoint>:<port>/api/v1/batches/token/<token>' \
      --form 'batchRequest="{
        \"batchType\": \"SPARK\",
        \"className\": \"org.apache.spark.examples.SparkPi\",
        \"name\": \"kyuubi-spark-pi\",
        \"resource\": \"oss://bucket/path/to/spark-examples_2.12-3.3.1.jar\"
      }";type=application/json'

配置并连接高可用Kyuubi Gateway

  1. 打通网络连接。

    参见EMR Serverless Spark与其他VPC间网络互通打通网络连接,确保您的客户端能够访问目标VPC内的Zookeeper集群。例如,阿里云MSEEMR on ECSZookeeper组件。

  2. 启用Kyuubi Gateway的高可用。

    在创建或者编辑Kyuubi Gateway时,开启了服务高可用并配置了相关参数,并且网络连接选择了已打通的网络连接。

  3. 连接高可用Kyuubi Gateway。

    完成上述配置后,Kyuubi Gateway将通过Zookeeper实现高可用。您可以通过REST APIJDBC连接来验证其可用性。

    在连接Kyuubi Gateway时,请根据您的实际情况替换JDBC URL中的信息:

    • <endpoint>:您在总览页签获取的Endpoint信息。

    • <port>:端口号。使用外网域名访问时端口号为443,使用内网域名访问时端口号为80。

    • <token>:您在Token管理页签复制的Token信息。

    • <tokenname>:Token名称。您可以在Token管理页签获取。

    • <UserName/RoleName>:您在访问控制中添加的RAM用户或RAM角色。

    通过以下示例为您介绍如何连接高可用Kyuubi Gateway

    使用Beeline连接

    1. 单击kyuubi-hive-jdbc-1.9.2.jar,下载JDBC Driver JAR。

    2. 替换JDBC Driver JAR。

      1. 备份并移动原有的JDBC Driver JAR包。

        mv /your_path/apache-kyuubi-1.9.2-bin/beeline-jars /bak_path
        说明

        如果您使用的是EMR on ECS,则Kyuubi的默认路径为/opt/apps/KYUUBI/kyuubi-1.9.2-1.0.0/beeline-jars。如果不确定Kyuubi的安装路径,可以通过env | grep KYUUBI_HOME命令查找。

      2. 替换为新的JDBC Driver JAR包。

        cp /download/serverless-spark-kyuubi-hive-jdbc-1.9.2.jar /your_path/apache-kyuubi-1.9.2-bin/beeline-jars
    3. 使用Beeline连接。

      /your_path/apache-kyuubi-1.9.2-bin/bin/beeline -u 'jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>'

    使用Java连接

    1. 单击serverless-spark-kyuubi-hive-jdbc-shaded-1.9.2.jar,下载shaded包。

    2. 安装JDBC DriverMaven仓库。

      执行以下命令,将Serverless Spark提供的JDBC Driver安装到本地Maven仓库。

      mvn install:install-file \
        -Dfile=/download/serverless-spark-kyuubi-hive-jdbc-shaded-1.9.2.jar \
        -DgroupId=org.apache.kyuubi \
        -DartifactId=kyuubi-hive-jdbc-shaded \
        -Dversion=1.9.2-ss \
        -Dpackaging=jar
    3. 修改pom.xml文件。

      在项目的pom.xml中添加以下依赖。

      <dependencies>
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
          </dependency>
          <dependency>
            <groupId>org.apache.kyuubi</groupId>
            <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
            <version>1.9.2-ss</version>
          </dependency>
      </dependencies>
    4. 编写Java示例代码。

      import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
      
      import java.sql.Connection;
      import java.sql.DriverManager;
      import java.sql.ResultSet;
      import java.sql.ResultSetMetaData;
      
      
      public class Main {
          public static void main(String[] args) throws Exception {
              String url = "jdbc:hive2://<endpoint>:<port>/;transportMode=http;httpPath=cliservice/token/<token>";
              Class.forName("org.apache.kyuubi.jdbc.KyuubiHiveDriver");
              Connection conn = DriverManager.getConnection(url);
              KyuubiStatement stmt = (KyuubiStatement) conn.createStatement();
      
      
              String sql = "select * from test;";
              ResultSet res = stmt.executeQuery(sql);
      
              ResultSetMetaData md = res.getMetaData();
              String[] columns = new String[md.getColumnCount()];
              for (int i = 0; i < columns.length; i++) {
                  columns[i] = md.getColumnName(i + 1);
              }
              while (res.next()) {
                  System.out.print("Row " + res.getRow() + "=[");
                  for (int i = 0; i < columns.length; i++) {
                      if (i != 0) {
                          System.out.print(", ");
                      }
                      System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'");
                  }
                  System.out.println(")]");
              }
      
              conn.close();
          }
      }

查看Kyuubi提交的Spark任务列表

通过Kyuubi提交的Spark任务,您可以在Application 列表页签中查看详细的任务信息,包括应用ID应用名称状态以及启动时间等。这些信息可以帮助您快速了解和管理Kyuubi提交的Spark任务。

  1. Kyuubi Gateway页面,单击目标Kyuubi Gateway的名称。

  2. 单击Application 列表页签。

    image

    在该页面,您可以查看通过该Kyuubi提交的所有Spark任务的详细信息。其中,应用ID(spark-xxxx)由Spark引擎生成,与Kyuubi客户端连接时输出的Application ID完全一致,用于唯一标识任务实例。

    image