读写Hologres

基于Hologres提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接Hologres。本文为您介绍在EMR Serverless Spark环境中实现Hologres的数据读取和写入操作。

使用限制

V1.3及以上版本的Hologres实例支持Spark Connector。您可以在Hologres管理控制台实例详情页查看当前实例版本。若您的实例是V1.3以下版本,请使用实例升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。

操作流程

步骤一:获取 hologres-connector-spark JAR并上传至OSS

  1. Spark读写Hologres需要引用的连接器JAR包,您可以通过Maven中央仓库进行下载。本文提供1.5.6版本,您可以通过单击附件hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar下载。

  2. 将下载的 hologres-connector-spark JAR上传至阿里云OSS中,上传操作可以参见简单上传

步骤二:添加网络连接

  1. 获取网络信息。

    您可以在实时数仓Hologres页面,进入目标Hologres实例的实例详情页面,以获取该实例的专有网络和交换机信息。

  2. 新增网络连接。

    Serverless Spark需要能够打通与Hologres集群之间的网络才可以正常访问Hologres服务。有关更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通

步骤三:在Hologres中创建库表

  1. 连接Hologres实例,详情请参见连接实例

  2. SQL编辑器页签,新增的临时Query查询中输入以下SQL语句,并执行。

    -- 创建数据库
    CREATE DATABASE testdb;
    -- 创建表
    CREATE TABLE "public"."test" (
        "id" text  NULL,
        "name" text  NULL);
    -- 插入数据
    INSERT INTO public.test VALUES ('1001','jack'),('1002','tony'),('1003','mike');
    -- 查询数据
    SELECT * FROM public.test 

    image

步骤四:通过Serverless Spark读写Hologres

示例1:SQL会话

SQL会话为例,读写Hologres。

  1. 创建SQL会话,详情请参见管理SQL会话

    创建会话时,在网络连接中选择上一步创建好的网络连接,并在Spark配置中添加以下参数来加载hologres-connector-spark。

    # 添加hologres-connector jar
    spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar
    
    # 配置holo catalog
    spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog
    spark.sql.catalog.hologres_external_test_db.username ***
    spark.sql.catalog.hologres_external_test_db.password ***
    spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdb

    参数详细说明如下:

    参数

    示例

    说明

    spark.emr.serverless.user.defined.jars

    oss://<bucket>/hologres-connector-spark-3.x-<version>.jar

    指定用户自定义的 JAR 包路径。

    spark.sql.catalog.hologres_external_test_db

    com.alibaba.hologres.spark3.HoloTableCatalog

    Spark 3.x 中用于配置 Hologres 数据源作为外部 Catalog,固定值。

    spark.sql.catalog.hologres_external_test_db.username

    LTAI******

    阿里云账号的AccessKey ID。

    spark.sql.catalog.hologres_external_test_db.password

    mXYV******

    阿里云账号的AccessKey Secret。

    spark.sql.catalog.hologres_external_test_db.jdbcurl

    jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

    Hologres 实例 JDBC 连接 URL 。

    其中参数名中hologres_external_test_db可自定义。

  2. 在数据开发页面,创建一个SparkSQL类型的任务,然后在右上角选择创建好的SQL会话。

    更多操作,请参见SparkSQL开发

  3. 拷贝如下代码到新增的SparkSQL页签中,然后单击运行

    -- 进入testdb database
    USE hologres_external_test_db;
    -- 写入数据
    INSERT INTO `public`.test VALUES ('1004','tom');
    -- 查询数据
    SELECT * FROM `public`.test;

    image

示例2:流任务

以流任务PySpark为例,从Kafka读数据,写入Hologres。

说明

确保KafkaHologres之间的网络连接畅通,建议将KafkaHologres部署在同一个VPC及同一交换机中。

  1. 代码示例。根据实际情况替换Kafka信息和Hologres表。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    
    # 配置你的 Kafka 信息
    servers = "alikafka-serverless-cn-xxxxx-vpc.alikafka.aliyuncs.com:9092"  # 替换为你的 Kafka bootstrap servers
    topic = "topic-name"  # 替换为你的 Kafka topic
    
    # 创建 SparkSession
    spark = SparkSession.builder \
        .appName("test read kafka") \
        .getOrCreate()
    
    # 读取 Kafka 流
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", servers) \
        .option("subscribe", topic) \
        .load()
    
    # 定义写入 Hologres 的函数(每个 micro-batch 调用一次)
    def write_to_hologres(batch_df, batch_id):
        print(f"Writing batch {batch_id} to Hologres...")
        batch_df.write \
            .format("hologres") \
            .mode("append") \
            .insertInto("hologres_external_test_db.public.test")  # 替换为你的 Hologres table
    
    # 转换 key 和 value 为字符串并输出到控制台
    query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream \
        .foreachBatch(write_to_hologres) \
        .outputMode("append") \
        .trigger(processingTime='30 seconds') \
        .start()
    
    # 等待流式查询结束(在 notebook 中会阻塞 cell 执行)
    query.awaitTermination()
  2. 上传文件。

    1. 文件管理页面,单击上传文件

    2. 上传文件对话框中,单击待上传文件区域选择上一步的Python代码文件,或直接拖拽Python代码文件到待上传文件区域。

  3. 创建流任务并运行。

    1. 数据开发页面,单击image(新建)图标。

    2. 在弹出的对话框中,输入名称,根据实际需求在流任务中选择PySpark类型,然后单击确定

    3. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击发布

      参数

      说明

      主 Python 资源

      选择前一个步骤中上传的Python文件。

      引擎版本

      选择合适的Spark版本。本文示例是esr-4.6.0

      网络连接

      选择步骤二中创建的网络。

      Spark配置

      # 添加hologres-connector jar
      spark.emr.serverless.user.defined.jars              oss://shulang-emr/test_script/hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar
      # 配置holo catalog
      spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog
      spark.sql.catalog.hologres_external_test_db.username ***
      spark.sql.catalog.hologres_external_test_db.password ***
      spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdb

      参数的详细说明请参见示例1:SQL会话

    4. 发布后,单击前往运维,在跳转页面,单击启动

  4. 验证结果。

    1. Kafka发送消息。

      image

    2. SparkSQL查询。image

示例3:Notebook会话

  1. 创建Notebook会话,详情请参见管理SQL会话

    创建会话时,在网络连接中选择上一步创建好的网络连接,并在Spark配置中添加以下参数来加载hologres-connector-spark。

    # 添加hologres-connector jar
    spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar
  2. 在数据开发页面,创建一个Notebook类型的任务,然后在右上角选择创建好的Notebook会话。

  3. 拷贝如下代码到新增的Notebook页签中,然后单击image

    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
    
    # 1. 准备 Pandas DataFrame(修复 hsap 为字符串)
    pdf = pd.DataFrame({
        "id": ["1006"],                 # 改为整数,匹配 Hologres 的 BIGINT/INT
        "name": ["sl"]        # 字符串
    })
    
    # 2. 转换为 PySpark DataFrame(可选:显式定义 schema 以确保类型正确)
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ])
    
    df = spark.createDataFrame(pdf, schema=schema)
    
    # 写入hologres
    df.write \
      .format("hologres") \
      .option("username", "LTAI******") \
      .option("password", "mXYV******") \
      .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \
      .option("table", "test") \
      .mode("append") \
      .save()
    
    # 读取数据
    readDf = spark.read\
      .format("hologres") \
      .option("username", "LTAI******") \
      .option("password", "mXYV******") \
      .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \
      .option("table", "test") \
      .load()
    
    readDf.select("id", "name").show(10)

    其中,涉及参数说明如下:

    参数

    示例

    说明

    spark.sql.catalog.hologres_external_test_db.username

    LTAI******

    阿里云账号的AccessKey ID。

    spark.sql.catalog.hologres_external_test_db.password

    mXYV******

    阿里云账号的AccessKey Secret。

    spark.sql.catalog.hologres_external_test_db.jdbcurl

    jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

    Hologres 实例 JDBC 连接 URL 。

  4. 验证结果。image

Hologres Catalog常用命令

在 Serverless Spark 中,通过 Hologres Catalog 将 Hologres 数据库(Database)以 外部 Catalog 形式接入 Spark SQL。每个 Catalog 严格绑定一个 Hologres Database,且不可跨库访问(即无法通过同一 Catalog 访问多个 Hologres DB)。Catalog 内部的逻辑组织与 Hologres 保持一致:

Spark 概念

对应 Hologres 概念

说明

Catalog

Database

如 hologres_external_test_db → 映射 Hologres 中的 test_db 数据库。

Namespace

Schema

如 publictest_schema;默认为 public,可通过 USE 切换当前默认 Namespace。

Table

Table

必须显式指定 namespace.table_name(如 public.test),或先 USE namespace 后直接引用表名。

加载Hologres Catalog

Spark中的Hologres Catalog完全对应一个HologresDatabase,使用过程中无法更改。

USE hologres_external_test_db;

查询所有Namespace

Spark中的Namespace,对应Hologres中的Schema,默认为public,使用过程中可以使用USE指令调整默认的Schema。

-- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。
SHOW NAMESPACES;

查询Namespace下的表

  • 查询所有表。

    SHOW TABLES;
  • 查询指定Namespace下的表。

    USE test_schema;
    SHOW TABLES;
    
    -- 或者使用 
    SHOW TABLES IN test_schema;

相关文档

有关Spark读写Hologres的更多信息,请参见Spark读写Hologres