基于Hologres提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接Hologres。本文为您介绍在EMR Serverless Spark环境中实现Hologres的数据读取和写入操作。
使用限制
仅V1.3及以上版本的Hologres实例支持Spark Connector。您可以在Hologres管理控制台的实例详情页查看当前实例版本。若您的实例是V1.3以下版本,请使用实例升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。
操作流程
步骤一:获取 hologres-connector-spark JAR并上传至OSS
Spark读写Hologres需要引用的连接器JAR包,您可以通过Maven中央仓库进行下载。本文提供1.5.6版本,您可以通过单击附件hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar下载。
将下载的 hologres-connector-spark JAR上传至阿里云OSS中,上传操作可以参见简单上传。
步骤二:添加网络连接
获取网络信息。
您可以在实时数仓Hologres页面,进入目标Hologres实例的实例详情页面,以获取该实例的专有网络和交换机信息。
新增网络连接。
Serverless Spark需要能够打通与Hologres集群之间的网络才可以正常访问Hologres服务。有关更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通。
步骤三:在Hologres中创建库表
连接Hologres实例,详情请参见连接实例。
在SQL编辑器页签,新增的临时Query查询中输入以下SQL语句,并执行。
-- 创建数据库 CREATE DATABASE testdb; -- 创建表 CREATE TABLE "public"."test" ( "id" text NULL, "name" text NULL); -- 插入数据 INSERT INTO public.test VALUES ('1001','jack'),('1002','tony'),('1003','mike'); -- 查询数据 SELECT * FROM public.test
步骤四:通过Serverless Spark读写Hologres
示例1:SQL会话
以SQL会话为例,读写Hologres。
创建SQL会话,详情请参见管理SQL会话。
创建会话时,在网络连接中选择上一步创建好的网络连接,并在Spark配置中添加以下参数来加载hologres-connector-spark。
# 添加hologres-connector jar spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar # 配置holo catalog spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog spark.sql.catalog.hologres_external_test_db.username *** spark.sql.catalog.hologres_external_test_db.password *** spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdb参数详细说明如下:
参数
示例
说明
spark.emr.serverless.user.defined.jarsoss://<bucket>/hologres-connector-spark-3.x-<version>.jar指定用户自定义的 JAR 包路径。
spark.sql.catalog.hologres_external_test_dbcom.alibaba.hologres.spark3.HoloTableCatalogSpark 3.x 中用于配置 Hologres 数据源作为外部 Catalog,固定值。
spark.sql.catalog.hologres_external_test_db.usernameLTAI******阿里云账号的AccessKey ID。
spark.sql.catalog.hologres_external_test_db.passwordmXYV******阿里云账号的AccessKey Secret。
spark.sql.catalog.hologres_external_test_db.jdbcurljdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbHologres 实例 JDBC 连接 URL 。
其中参数名中
hologres_external_test_db可自定义。在数据开发页面,创建一个SparkSQL类型的任务,然后在右上角选择创建好的SQL会话。
更多操作,请参见SparkSQL开发。
拷贝如下代码到新增的SparkSQL页签中,然后单击运行。
-- 进入testdb database USE hologres_external_test_db; -- 写入数据 INSERT INTO `public`.test VALUES ('1004','tom'); -- 查询数据 SELECT * FROM `public`.test;
示例2:流任务
以流任务PySpark为例,从Kafka读数据,写入Hologres。
确保Kafka与Hologres之间的网络连接畅通,建议将Kafka与Hologres部署在同一个VPC及同一交换机中。
代码示例。根据实际情况替换Kafka信息和Hologres表。
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 配置你的 Kafka 信息 servers = "alikafka-serverless-cn-xxxxx-vpc.alikafka.aliyuncs.com:9092" # 替换为你的 Kafka bootstrap servers topic = "topic-name" # 替换为你的 Kafka topic # 创建 SparkSession spark = SparkSession.builder \ .appName("test read kafka") \ .getOrCreate() # 读取 Kafka 流 df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", servers) \ .option("subscribe", topic) \ .load() # 定义写入 Hologres 的函数(每个 micro-batch 调用一次) def write_to_hologres(batch_df, batch_id): print(f"Writing batch {batch_id} to Hologres...") batch_df.write \ .format("hologres") \ .mode("append") \ .insertInto("hologres_external_test_db.public.test") # 替换为你的 Hologres table # 转换 key 和 value 为字符串并输出到控制台 query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .foreachBatch(write_to_hologres) \ .outputMode("append") \ .trigger(processingTime='30 seconds') \ .start() # 等待流式查询结束(在 notebook 中会阻塞 cell 执行) query.awaitTermination()上传文件。
在文件管理页面,单击上传文件。
在上传文件对话框中,单击待上传文件区域选择上一步的Python代码文件,或直接拖拽Python代码文件到待上传文件区域。
创建流任务并运行。
在数据开发页面,单击
(新建)图标。在弹出的对话框中,输入名称,根据实际需求在流任务中选择PySpark类型,然后单击确定。
在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击发布。
参数
说明
主 Python 资源
选择前一个步骤中上传的Python文件。
引擎版本
选择合适的Spark版本。本文示例是
esr-4.6.0。网络连接
选择步骤二中创建的网络。
Spark配置
# 添加hologres-connector jar spark.emr.serverless.user.defined.jars oss://shulang-emr/test_script/hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar # 配置holo catalog spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog spark.sql.catalog.hologres_external_test_db.username *** spark.sql.catalog.hologres_external_test_db.password *** spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdb参数的详细说明请参见示例1:SQL会话。
发布后,单击前往运维,在跳转页面,单击启动。
验证结果。
Kafka发送消息。

SparkSQL查询。

示例3:Notebook会话
创建Notebook会话,详情请参见管理SQL会话。
创建会话时,在网络连接中选择上一步创建好的网络连接,并在Spark配置中添加以下参数来加载hologres-connector-spark。
# 添加hologres-connector jar spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar在数据开发页面,创建一个Notebook类型的任务,然后在右上角选择创建好的Notebook会话。
拷贝如下代码到新增的Notebook页签中,然后单击
。import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType # 1. 准备 Pandas DataFrame(修复 hsap 为字符串) pdf = pd.DataFrame({ "id": ["1006"], # 改为整数,匹配 Hologres 的 BIGINT/INT "name": ["sl"] # 字符串 }) # 2. 转换为 PySpark DataFrame(可选:显式定义 schema 以确保类型正确) schema = StructType([ StructField("id", StringType(), True), StructField("name", StringType(), True) ]) df = spark.createDataFrame(pdf, schema=schema) # 写入hologres df.write \ .format("hologres") \ .option("username", "LTAI******") \ .option("password", "mXYV******") \ .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \ .option("table", "test") \ .mode("append") \ .save() # 读取数据 readDf = spark.read\ .format("hologres") \ .option("username", "LTAI******") \ .option("password", "mXYV******") \ .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \ .option("table", "test") \ .load() readDf.select("id", "name").show(10)其中,涉及参数说明如下:
参数
示例
说明
spark.sql.catalog.hologres_external_test_db.usernameLTAI******阿里云账号的AccessKey ID。
spark.sql.catalog.hologres_external_test_db.passwordmXYV******阿里云账号的AccessKey Secret。
spark.sql.catalog.hologres_external_test_db.jdbcurljdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbHologres 实例 JDBC 连接 URL 。
验证结果。

Hologres Catalog常用命令
在 Serverless Spark 中,通过 Hologres Catalog 将 Hologres 数据库(Database)以 外部 Catalog 形式接入 Spark SQL。每个 Catalog 严格绑定一个 Hologres Database,且不可跨库访问(即无法通过同一 Catalog 访问多个 Hologres DB)。Catalog 内部的逻辑组织与 Hologres 保持一致:
Spark 概念 | 对应 Hologres 概念 | 说明 |
Catalog | Database | 如 |
Namespace | Schema | 如 |
Table | Table | 必须显式指定 |
加载Hologres Catalog
Spark中的Hologres Catalog完全对应一个Hologres的Database,使用过程中无法更改。
USE hologres_external_test_db;查询所有Namespace
Spark中的Namespace,对应Hologres中的Schema,默认为public,使用过程中可以使用USE指令调整默认的Schema。
-- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。
SHOW NAMESPACES;查询Namespace下的表
查询所有表。
SHOW TABLES;查询指定Namespace下的表。
USE test_schema; SHOW TABLES; -- 或者使用 SHOW TABLES IN test_schema;
相关文档
有关Spark读写Hologres的更多信息,请参见Spark读写Hologres。