基于Apache Doris官方提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接Doris。本文为您介绍在EMR Serverless Spark环境中实现Doris的数据读取和写入操作。
背景信息
Apache Doris是一个高性能、实时的分析型数据库,能够较好地满足报表分析、即席查询、数据湖联邦查询加速等使用场景。更多信息,请参见Apache Doris 简介。
EMR Serverless Spark是一款兼容开源Spark的高性能Lakehouse产品,提供了企业级全托管的数据平台服务。通过结合Apache Doris与EMR Serverless Spark,您可以高效地进行数据读取、写入和分析操作,从而实现端到端的数据处理流程。
前提条件
使用限制
Serverless Spark引擎的版本要求为esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。
操作流程
步骤一:获取Doris Spark Connector JAR并上传至OSS
您需要查阅Doris的官方文档Spark Doris Connector。该文档通常会列出不同版本的连接器与不同版本的 Spark 引擎的兼容情况。您需要确认您正在使用的 Spark 版本与 Doris Spark Connector 版本之间的兼容性。
步骤二:创建网络连接
Serverless Spark需要能够打通与EMR Doris集群之间的网络才可以正常访问Doris服务。更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通。
配置安全组规则时,端口范围请根据实际需求选择性开放必要的端口。端口范围的取值为1~65535。本文示例需开启HTTP 端口(8031)、RPC 端口(9061)以及Webserver端口(8041)。
步骤三:在EMR Doris集群中创建库表
使用SSH方式登录集群,详情请参见登录集群。
执行以下命令,连接EMR Doris集群。
mysql -h127.0.0.1 -P 9031 -uroot
创建数据库和表。
CREATE DATABASE IF NOT EXISTS testdb; USE testdb; CREATE TABLE test ( id INT, name STRING ) PROPERTIES("replication_num" = "1");
插入测试数据。
INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');
查询数据。
SELECT * FROM test;
返回信息如下图所示。
步骤四:Serverless Spark读取Doris表
创建SQL会话,详情请参见管理SQL会话。
创建会话时,在引擎版本下拉列表中选择与Doris Spark Connector版本对应的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在Spark配置中添加以下参数来加载Doris Spark Connector。
spark.user.defined.jars oss://<bucketname>/path/connector.jar
其中,
oss://<bucketname>/path/connector.jar
为您步骤一中上传至OSS的Doris Spark Connector的路径。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar
。在数据开发页面,选择创建一个
类型的任务,然后在右上角选择创建好的SQL会话。更多操作,请参见SparkSQL开发。
拷贝如下代码到新增的SparkSQL页签中,并根据需要修改相应的参数信息,然后单击运行。
CREATE TEMPORARY VIEW test USING doris OPTIONS( "table.identifier" = "testdb.test", "fenodes" = "<doris_address>:<http_port>", "user" = "<user>", "password" = "<password>" ); SELECT * FROM test;
其中,涉及参数信息说明如下。
参数
描述
示例
testdb.test
Doris服务中实际的数据库和表名。
如果您使用的是其他Doris集群,请根据实际情况填写相应的配置。
如果您使用的是EMR on ECS中创建的集群,则填写如下参数值:
testdb.test
:本文以testdb.test
为例。<doris_address>
:您可以在EMR on ECS控制台Doris集群的节点管理页面,单击emr-master前的图标,查看内网IP地址。<http_port>
:默认为8031。<user>
:默认用户名为root
。<password>
:默认密码为空。
<doris_address>
Doris服务所在的节点内网IP地址。
<http_port>
Doris服务监听HTTP请求的端口号。
<user>
用于连接Doris服务的用户名。
<password>
用于连接Doris服务的用户密码。
如果能够正常返回数据,则表明配置正确。
创建Notebook会话,详情请参见管理Notebook会话。
创建会话时,在引擎版本下拉列表中选择与Doris Spark Connector版本对应的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在Spark配置中添加以下参数来加载Doris Spark Connector。
spark.user.defined.jars oss://<bucketname>/path/connector.jar
其中,
oss://<bucketname>/path/connector.jar
为您步骤一中上传至OSS的Doris Spark Connector的路径。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar
。在数据开发页面,选择创建一个
类型的任务,然后在右上角选择创建的Notebook会话。更多操作,请参见管理Notebook会话。
拷贝如下代码到新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行。
dorisSparkDF = spark.read.format("doris") \ .option("doris.table.identifier", "testdb.test") \ .option("doris.fenodes", "<doris_address>:<http_port>") \ .option("user", "<user>") \ .option("password", "<password>") \ .load() dorisSparkDF.show(3)
其中,涉及参数信息说明如下。
参数
描述
示例
testdb.test
Doris服务中实际的数据库和表名。
如果您使用的是其他Doris集群,请根据实际情况填写相应的配置。
如果您使用的是EMR on ECS中创建的集群,则填写如下参数值:
testdb.test
:本文以testdb.test
为例。<doris_address>
:您可以在EMR on ECS控制台Doris集群的节点管理页面,单击emr-master前的图标,查看内网IP地址。<http_port>
:默认为8031。<user>
:默认用户名为root
。<password>
:默认密码为空。
<doris_address>
Doris服务所在的节点内网IP地址。
<http_port>
Doris服务监听HTTP请求的端口号。
<user>
用于连接Doris服务的用户名。
<password>
用于连接Doris服务的用户密码。
如果能够正常返回数据,则表明配置正确。
步骤五:Serverless Spark写入Doris表
拷贝如下代码到前一个步骤中新增的SparkSQL页签中,并根据需要修改相应的参数信息,然后单击运行。
CREATE TEMPORARY VIEW test_write
USING doris
OPTIONS(
"table.identifier" = "testdb.test",
"fenodes" = "<doris_address>:<http_port>",
"user" = "<user>",
"password" = "<password>"
);
INSERT INTO test_write VALUES (4, 'd'), (5, 'e');
SELECT * FROM test_write;
如果能够返回以下数据,则表明数据写入成功。
拷贝如下代码到前一个步骤中新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行。
data = [(7, 'f'), (8, 'g')]
mockDataDF = spark.createDataFrame(data, ["id", "name"])
mockDataDF.write.mode("append").format("doris") \
.option("doris.table.identifier", "testdb.test") \
.option("doris.fenodes", "<doris_address>:<http_port>") \
.option("user", "<user>") \
.option("password", "<password>") \
.save()
dorisSparkDF = spark.read.format("doris") \
.option("doris.table.identifier", "testdb.test") \
.option("doris.fenodes", "<doris_address>:<http_port>") \
.option("user", "<user>") \
.option("password", "<password>") \
.load()
dorisSparkDF.show(10)
如果能够返回以下数据,则表明数据写入成功。
- 本页导读 (1)
- 背景信息
- 前提条件
- 使用限制
- 操作流程
- 步骤一:获取Doris Spark Connector JAR并上传至OSS
- 步骤二:创建网络连接
- 步骤三:在EMR Doris集群中创建库表
- 步骤四:Serverless Spark读取Doris表
- 步骤五:Serverless Spark写入Doris表