管理Spark Thrift Server会话

更新时间:
复制为 MD 格式

Spark Thrift ServerApache Spark提供的一种服务,支持通过JDBCODBC连接并执行SQL查询,从而便捷地将Spark环境与现有的商业智能(BI)工具、数据可视化工具及其他数据分析工具集成。本文主要为您介绍如何创建并连接Spark Thrift Server

前提条件

已创建工作空间,详情请参见管理工作空间

创建Spark Thrift Server会话

Spark Thrift Server创建完成后,您可以在创建Spark SQL类型任务时选择此会话。

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. 会话管理页面,单击Spark Thrift Server 会话页签。

  3. 单击创建 Spark Thrift Server 会话

  4. 创建 Spark Thrift Server 会话页面,配置以下信息,单击创建

    参数

    说明

    名称

    新建Spark Thrift Server的名称。

    长度限制为1~64个字符,仅支持字母、数字、短划线(-)、下划线(_)和空格。

    部署队列

    请选择合适的开发队列部署会话。仅支持选择开发或者开发和生产共用的队列。

    队列更多信息,请参见管理资源队列

    引擎版本

    当前会话使用的引擎版本。引擎版本号含义等详情请参见引擎版本介绍

    使用 Fusion 加速

    Fusion可加速Spark负载的运行并降低任务的总成本。有关计费信息,请参见产品计费。有关Fusion引擎介绍,请参见Fusion引擎

    自动停止

    默认开启。45分钟不活动后自动停止Spark Thrift Server会话。

    网络连接

    选择已创建的网络连接,以便直接访问VPC内的数据源或外部服务。有关创建网络连接的具体操作,请参见EMR Serverless Spark与其他VPC间网络互通

    Spark Thrift Server 端口

    使用外网域名访问时端口号为443,使用内网域名访问时端口号为80。

    访问凭证

    仅支持Token方式。

    spark.driver.cores

    用于指定Spark应用程序中Driver进程所使用的CPU核心数量。默认值为1 CPU。

    spark.driver.memory

    用于指定Spark应用程序中Driver进程可以使用的内存量。默认值为3.5 GB。

    spark.executor.cores

    用于指定每个Executor进程可以使用的CPU核心数量。默认值为1 CPU。

    spark.executor.memory

    用于指定每个Executor进程可以使用的内存量。默认值为3.5 GB。

    spark.executor.instances

    Spark分配的执行器(Executor)数量。默认值为2。

    动态资源分配

    默认关闭。开启后,需要配置以下参数:

    • executors 数量下限:默认为2。

    • executors 数量上限:如果未设置spark.executor.instances,则默认值为10。

    更多内存配置

    • spark.driver.memoryOverhead:每个Driver可利用的非堆内存。如果未设置该参数,Spark会根据默认值自动分配,默认值为 max(384MB, 10% × spark.driver.memory)

    • spark.executor.memoryOverhead:每个Executor可利用的非堆内存。如果未设置该参数,Spark会根据默认值自动分配,默认值为 max(384MB, 10% × spark.executor.memory)

    • spark.memory.offHeap.size:Spark可用的堆外内存大小。默认值为1 GB。

      仅在spark.memory.offHeap.enabled设置为true时生效。默认情况下,当采用Fusion Engine时,该功能将处于启用状态,其非堆内存默认设置为1 GB。

    Spark 配置

    填写Spark配置信息,默认以空格符分隔,例如,spark.sql.catalog.paimon.metastore dlf

  5. 获取Endpoint信息。

    1. Spark Thrift Server 会话页签,单击新增的Spark Thrift Server的名称。

    2. 总览页签,复制Endpoint信息。

      根据网络环境的不同,可以选择以下两种Endpoint:

      • 外网Endpoint:适用于通过公网访问EMR Serverless Spark的场景,例如本地开发机、外部网络或跨云环境的访问。此方式可能会产生流量费用,请确保采取必要的安全措施。

      • 内网Endpoint:适用于同地域的阿里云ECS实例通过内网访问EMR Serverless Spark的场景。内网访问免费且更加安全,但仅限同一地域的阿里云内网环境使用。

创建Token

  1. Spark Thrift Server 会话页签,单击新增的Spark Thrift Server会话的名称。

  2. 单击Token 管理页签。

  3. 单击创建 Token

  4. 创建 Token对话框中,配置以下信息,单击确定

    参数

    说明

    名称

    新建Token的名称。

    过期时间

    设置该Token的过期时间。设置的天数应大于或等于1。默认情况下为开启状态,365天后过期。

  5. 复制Token信息。

    重要

    Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。

连接Spark Thrift Server

在连接Spark Thrift Server时,请根据您的实际情况替换以下信息:

  • <endpoint>:您在总览页签获取的Endpoint(外网)Endpoint(内网)信息。

    如果使用内网Endpoint,访问Spark Thrift Server限于同一VPC内的资源。

  • <port>:端口号。使用外网域名访问时端口号为443,使用内网域名访问时端口号为80。

  • <username>:您在Token 管理页签新建的Token的名称。

  • <token>:您在Token 管理页签复制的Token信息。

使用Python连接Spark Thrift Server

  1. 执行以下命令,安装PyHiveThrift包。

    pip install pyhive thrift
  2. 编写Python脚本,连接Spark Thrift Server。

    以下是一个Python脚本示例,展示如何连接到Hive并显示数据库列表。根据网络环境的不同(外网或内网),您可以选择适合的连接方式。

    使用外网域名连接

    from pyhive import hive
    
    if __name__ == '__main__':
        # 替换<endpoint>、<username>和<token>为您的实际信息。
        cursor = hive.connect('<endpoint>', port=443, scheme='https', username='<username>', password='<token>').cursor()
        cursor.execute('show databases')
        print(cursor.fetchall())
        cursor.close()

    使用内网域名连接

    from pyhive import hive
    
    if __name__ == '__main__':
        # 替换<endpoint>、<username>和<token>为您的实际信息。
        cursor = hive.connect('<endpoint>', port=80, scheme='http', username='<username>', password='<token>').cursor()
        cursor.execute('show databases')
        print(cursor.fetchall())
        cursor.close()

使用Java连接Spark Thrift Server

  1. 请在您的pom.xml中引入以下Maven依赖。

    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>2.1.0</version>
            </dependency>
        </dependencies>
    
    说明

    当前Serverless Spark内置的Hive版本为2.x,因此仅支持hive-jdbc 2.x版本。

  2. 编写Java代码,连接Spark Thrift Server。

    以下是一个Sample Java代码,用于连接到Spark Thrift Server,并查询数据库列表。

    使用外网域名连接

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import org.apache.hive.jdbc.HiveStatement;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            String url = "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice/token/<token>";
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection conn = DriverManager.getConnection(url);
            HiveStatement stmt = (HiveStatement) conn.createStatement();
    
            String sql = "show databases";
            System.out.println("Running " + sql);
            ResultSet res = stmt.executeQuery(sql);
    
            ResultSetMetaData md = res.getMetaData();
            String[] columns = new String[md.getColumnCount()];
            for (int i = 0; i < columns.length; i++) {
                columns[i] = md.getColumnName(i + 1);
            }
            while (res.next()) {
                System.out.print("Row " + res.getRow() + "=[");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        System.out.print(", ");
                    }
                    System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'");
                }
                System.out.println(")]");
            }
            
            conn.close();
        }
    }

    使用内网域名连接

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import org.apache.hive.jdbc.HiveStatement;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            String url = "jdbc:hive2://<endpoint>:80/;transportMode=http;httpPath=cliservice/token/<token>";
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection conn = DriverManager.getConnection(url);
            HiveStatement stmt = (HiveStatement) conn.createStatement();
    
            String sql = "show databases";
            System.out.println("Running " + sql);
            ResultSet res = stmt.executeQuery(sql);
    
            ResultSetMetaData md = res.getMetaData();
            String[] columns = new String[md.getColumnCount()];
            for (int i = 0; i < columns.length; i++) {
                columns[i] = md.getColumnName(i + 1);
            }
            while (res.next()) {
                System.out.print("Row " + res.getRow() + "=[");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        System.out.print(", ");
                    }
                    System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'");
                }
                System.out.println(")]");
            }
            
            conn.close();
        }
    }

通过Spark Beeline连接Spark Thrift Server

  • 如果您使用的是自建集群,需先进入Sparkbin目录,然后使用beeline连接Spark Thrift Server。

    使用外网域名连接

    cd /opt/apps/SPARK3/spark-3.4.2-hadoop3.2-1.0.3/bin/
    
    ./beeline -u "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice/token/<token>"

    使用内网域名连接

    cd /opt/apps/SPARK3/spark-3.4.2-hadoop3.2-1.0.3/bin/
    
    ./beeline -u "jdbc:hive2://<endpoint>:80/;transportMode=http;httpPath=cliservice/token/<token>"
    说明

    代码中的/opt/apps/SPARK3/spark-3.4.2-hadoop3.2-1.0.3是以EMR on ECS 集群的Spark安装路径为例,实际应根据客户端的Spark安装路径进行相应调整。如果您不确定Spark的安装路径,可以通过env | grep SPARK_HOME命令查找。

  • 如果您使用的是EMR on ECS 集群,可以直接使用Spark Beeline客户端连接到Spark Thrift Server。

    使用外网域名连接

    spark-beeline -u "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice/token/<token>"

    使用内网域名连接

    spark-beeline -u "jdbc:hive2://<endpoint>:80/;transportMode=http;httpPath=cliservice/token/<token>"

在使用Hive Beeline连接Serverless Spark Thrift Server时,如果出现以下报错,通常是由于Hive Beeline的版本与Spark Thrift Server不兼容导致。因此,建议使用Hive 2.x版本的Beeline。

24/08/22 15:09:11 [main]: ERROR jdbc.HiveConnection: Error opening session
org.apache.thrift.transport.TTransportException: HTTP Response code: 404

配置Apache Superset连接Spark Thrift Server

Apache Superset是一个现代数据探索和可视化平台,具有丰富的从简单的折线图到高度详细的地理空间图表的图表形态。更多Superset信息,请参见Superset

  1. 安装依赖。

    请确保您已经安装了0.20.0版本的 thrift 包。如未安装,您可以使用以下命令安装。

    pip install thrift==0.20.0
  2. 启动Superset,进入Superset界面。

    更多启动操作信息,请参见Superset文档

  3. 在页面右上角单击DATABASE,进入Connect a database页面。

  4. Connect a database页面,选择Apache Spark SQL

    image

  5. 填写连接字符串,然后配置相关数据源参数。

    使用外网域名连接

    hive+https://<username>:<token>@<endpoint>:443/<db_name>

    使用内网域名连接

    hive+http://<username>:<token>@<endpoint>:80/<db_name>
  6. 单击FINISH,以确认成功连接和验证。

配置Hue连接Spark Thrift Server

Hue是一个流行的开源Web界面,可用于与Hadoop生态系统进行交互。关于Hue的更多介绍,请参见Hue官方文档

  1. 安装依赖。

    请确保您已经安装了0.20.0版本的 thrift 包。如未安装,您可以使用以下命令安装。

    pip install thrift==0.20.0
  2. Hue的配置文件中添加Spark SQL连接串。

    请找到Hue的配置文件(通常位于/etc/hue/hue.conf),并在文件中添加以下内容。

    使用外网域名连接

    [[[sparksql]]]
         name = Spark Sql
         interface=sqlalchemy
         options='{"url": "hive+https://<username>:<token>@<endpoint>:443/"}'

    使用内网域名连接

    [[[sparksql]]]
         name = Spark Sql
         interface=sqlalchemy
         options='{"url": "hive+http://<username>:<token>@<endpoint>:80/"}'
  3. 重启Hue。

    修改配置后,您需要执行以下命令重启Hue服务以使更改生效。

    sudo service hue restart
  4. 验证连接。

    成功重启后,访问Hue界面,找到Spark SQL选项。如果配置正确,您应能够成功连接到Spark Thrift Server并执行SQL查询。

    image

使用DataGrip连接Spark Thrift Server

DataGrip是面向开发人员的数据库管理环境,旨在便捷地进行数据库的查询、创建和管理。数据库可运行于本地、服务器或云端。如需了解更多关于DataGrip的信息,请参见DataGrip

  1. 安装DataGrip,详情请参见Install DataGrip

    本文示例中的DataGrip版本为2025.1.2。

  2. 打开DataGrip客户端,进入DataGrip界面。

  3. 创建项目。

    1. 单击image,选择New

      image

    2. New Project对话框中,输入项目名,例如Spark,单击OK

  4. 单击Database Explorer菜单栏的 创建连接图标。选择Data Source > Other > Apache Spark

    image

  5. Data Sources and Drivers对话框,配置如下参数。

    image

    页签

    参数

    说明

    General

    Name

    自定义的连接名称。例如,spark_thrift_server。

    Authentication

    选择鉴权方式。本文选择的No auth

    在生产环境中,请选择User & Password,确保只有授权用户能够提交SQL任务,提高系统的安全性。

    Driver

    单击Apache Spark,然后单击Go to Driver ,确认Driver版本为ver. 1.2.2

    说明

    由于当前Serverless Spark引擎的版本为3.x,为确保系统的稳定性和功能的兼容性,Driver版本必须选择1.2.2。

    image

    URL

    连接Spark Thrift ServerURL。根据网络环境的不同(外网或内网),您可以选择适合的连接方式。

    • 使用外网域名连接

      jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice/token/<token>
    • 使用内网域名连接

      jdbc:hive2://<endpoint>:80/;transportMode=http;httpPath=cliservice/token/<token>

    Options

    Run keep-alive query

    该参数为可选配置。勾选该参数可防止超时自动断开。

  6. 单击Test Connection,确认数据源配置成功。

    image

  7. 单击OK,完成配置。

  8. 使用DataGrip管理Spark Thrift Server。

    DataGrip成功连接Spark Thrift Server后,您可以进行数据开发,更多信息请参见DataGrip帮助文档

    例如,您可以在创建的连接下,在目标表上右键,选择New,并在右侧SQL编辑器中编写并运行SQL脚本,即可查看表数据信息。

    image

使用Redash连接Spark Thrift Server

Redash是一款开源的BI工具,提供了基于Web的数据库查询和数据可视化功能。如需了解更多关于Redash的信息,请参见Redash官方文档

  1. 安装Redash,详情请参见Redash官方文档

  2. 安装依赖。

    请确保您已经安装了0.20.0版本的thrift包。如未安装,您可以使用以下命令安装。

    pip install thrift==0.20.0
  3. 登录Redash。

  4. 在左侧导航栏单击Settings,并在Data Sources页签下单击+New Data Source

  5. 在弹出的对话框中配置以下参数,然后单击Create

    image

    参数

    说明

    Type Selection

    数据源类型。在搜索框中查找并选择Hive(HTTP)

    Configuration

    Name

    数据源名称。您可以自定义。

    Host

    Spark Thrift ServerEndpoint地址。

    您可以在总览页签获取的Endpoint(外网)Endpoint(内网)信息。

    Port

    • 如果使用外网域名访问,端口号为443。

    • 如果使用内网域名访问,端口号为80。

    HTTP Path

    固定填写为/cliservice

    Username

    用户名。可以任意填写,例如 root

    Password

    填写您创建的Token信息。

    HTTP Scheme

    • 如果使用外网域名访问,填写为https

    • 如果使用内网域名访问,填写为http

  6. 在页面上方选择,您可以在页面的编辑框中编写SQL语句。

    image

使用dbt连接Spark Thrift Server

dbt(data build tool)是一个数据转换工具,它允许数据分析师和工程师使用SQL编写数据转换逻辑,并以软件工程的最佳实践(如版本控制、测试、文档)来管理和部署这些转换。如需了解更多关于dbt的信息,请参见dbt官方文档

  1. 安装dbt。

    pip install dbt-spark

    如果您需要使用Hive连接器,也可以安装:

    pip install dbt-spark[HIVE]
  2. 创建dbt项目。

    dbt init my_spark_project
    cd my_spark_project
  3. 配置dbt profile。

    ~/.dbt/profiles.yml文件中配置Spark Thrift Server连接信息:

    使用外网域名连接

    my_spark_project:
      target: dev
      outputs:
        dev:
          type: spark
          method: thrift
          host: <endpoint>
          port: 443
          user: <username>
          password: <token>
          schema: default
          connect_retries: 5
          connect_timeout: 60
          retry_all: true
          use_ssl: true
          server_side_parameters:
            "hive.exec.dynamic.partition": "true"
            "hive.exec.dynamic.partition.mode": "nonstrict"

    使用内网域名连接

    my_spark_project:
      target: dev
      outputs:
        dev:
          type: spark
          method: thrift
          host: <endpoint>
          port: 80
          user: <username>
          password: <token>
          schema: default
          connect_retries: 5
          connect_timeout: 60
          retry_all: true
          use_ssl: false
          server_side_parameters:
            "hive.exec.dynamic.partition": "true"
            "hive.exec.dynamic.partition.mode": "nonstrict"
  4. 测试连接。

    dbt debug

    如果连接成功,您将看到类似以下的输出:

    Connection test: [OK connection ok]
  5. 创建dbt模型。

    models/目录下创建SQL文件,例如models/example_model.sql

    {{ config(materialized='table') }} select col1, col2, current_timestamp() as created_at from {{ ref('source_table') }} where col1 is not null
  6. 运行dbt项目。

    # 运行所有模型
    dbt run
    
    # 运行特定模型
    dbt run --models example_model
    
    # 运行测试
    dbt test
    
    # 生成文档
    dbt docs generate
    dbt docs serve
  7. 配置源表和测试。

    models/schema.yml文件中定义源表和测试:

    version: 2
    
    sources:
      - name: raw_data
        description: "Raw data from source systems"
        tables:
          - name: source_table
            description: "Source table containing raw data"
            columns:
              - name: col1
                description: "Primary identifier"
                tests:
                  - not_null
                  - unique
              - name: col2
                description: "Data column"
    
    models:
      - name: example_model
        description: "Transformed data model"
        columns:
          - name: col1
            description: "Primary identifier"
            tests:
              - not_null
              - unique
          - name: col2
            description: "Processed data column"
          - name: created_at
            description: "Record creation timestamp"

注意事项:

  • 确保您的dbt版本与Spark Thrift Server兼容。建议使用dbt-spark 1.3.0或更高版本。

  • 在生产环境中,建议将敏感信息(如Token)通过环境变量进行配置,而不是直接写在配置文件中。

  • 如果遇到连接超时问题,可以适当调整connect_timeoutconnect_retries参数。

  • 对于大型数据集,建议使用增量模型以提高性能:

{{ config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='merge'
) }} select * from source_table {% if is_incremental() %} where updated_at > (select max(updated_at) from {{ this }}) {% endif %}

通过以上配置,您就可以使用dbt连接到Spark Thrift Server,并利用dbt的强大功能进行数据转换和管理。

查看运行记录

在数据开发任务执行完成后,您可以通过会话管理页面查看任务的运行记录。具体操作步骤如下:

  1. 在会话列表页面,单击会话名称。

  2. 单击运行记录页签。

    在该页面中,您可以查看任务的详细运行信息,包括运行ID,启动时间,Spark UI等信息。

    image