现有湖仓一体架构是以MaxCompute为中心读写Hadoop集群数据,有些线下IDC场景,客户不愿意对公网暴露集群内部信息,需要从Hadoop集群发起访问云上的数据。本文以开源大数据开发平台E-MapReduce(云上Hadoop)方式模拟本地Hadoop集群,为您介绍如何读写MaxCompute数据。
背景信息
实践架构图如下所示。
准备开发环境
- 准备E-MapReduce(EMR)环境。
- 购买EMR集群。
详情请参见E-MapReduce快速入门。
- 登录EMR集群。
说明 登录E-MapReduce集群详情请参见
登录集群。
本实践登录ECS实例进行操作,连接ECS实例请参见连接ECS实例。
- 准备本地IDEA。
- 安装IntelliJ IDEA。
本实践在IntelliJ IDEA运行,需要安装IntelliJ IDEA,详情请参见Install IntelliJ IDEA。
- 安装Maven。
详情请参见安装Maven。
- 创建Scala项目。
- 下载Scala插件。
打开IDEA,选择
File>
Settings。在
Settings对话框左侧导航栏单击
Plugins,单击Scala后的
Install。
- 安装Scala JDK
详情请参见Install Scala on your computer。
- 创建Scala项目
在IDEA里新建项目,选择
Scala>
IDEA,即可创建Scala项目。
- 准备MaxCompute数据
- 创建项目
MaxCompute创建Project请参见创建MaxCompute项目。
- 获取AccessKey
您可以进入AccessKey管理页面获取AccessKey ID和AccessKey Secret。
- 获取Endpoint
MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见Endpoint。
- 创建Table
本实践需准备分区表和非分区表供测试使用,创建表详情请参见创建表。
读写MaxCompute数据
- 代码开发。
本实践提供如下读非分区表代码开发示例。
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.spark.sql.SparkSession
/**
* @author renxiang
* @date 2021-12-20
*/
object DataReaderTest {
val ODPS_DATA_SOURCE = "org.apache.spark.sql.odps.datasource.DefaultSource"
val ODPS_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api"
def main(args: Array[String]): Unit = {
val odpsProject = args(0)
val odpsAkId = args(1)
val odpsAkKey = args(2)
val odpsTable = args(3)
val spark = SparkSession
.builder()
.appName("odps-datasource-reader")
.getOrCreate()
import spark._
val df = spark.read.format(ODPS_DATA_SOURCE)
.option("spark.hadoop.odps.project.name", odpsProject)
.option("spark.hadoop.odps.access.id", odpsAkId)
.option("spark.hadoop.odps.access.key", odpsAkKey)
.option("spark.hadoop.odps.end.point", ODPS_ENDPOINT)
.option("spark.hadoop.odps.table.name", odpsTable)
.load()
df.createOrReplaceTempView("odps_table")
println("select * from odps_table")
val dfFullScan = sql("select * from odps_table")
println(dfFullScan.count)
dfFullScan.show(10)
Thread.sleep(72*3600*1000)
}
}
- 代码打包和上传。
- Maven打包代码。
- 在IDEA的代码开发页面右边栏,单击Maven。
- 在Maven对话框,双击Lifecycle目录下的package进行打包。
- 本地编译jar包。
- 进入Project目录。
在系统的命令行执行窗口(例如Windows的cmd窗口)执行如下命令。
cd ${project.dir}/spark-datasource-v3.1
- 使用
mvn
命令构建spark-datasource。mvn clean package jar:test-jar
- 查看
target
目录下是否有dependencies.jar和tests.jar。
- 上传jar包至服务器。
- 在本地使用
scp
命令上传已经打包好的jar包和依赖的jar包至服务器,命令语法如下。scp <本地jar包路径> root@<ECS实例公网IP>:<服务器存放jar包路径>
示例如下。scp D:\Project\emr_mc_1\spark-datasource-v3.1\target\spark-datasource-1.0-SNAPSHOT-tests.jar root@8.xx.xx.xx:/root/emr_mc
- 查看jar包。
在服务器
emr_mc
目录下使用
ll
命令查看jar包。
- 使用如下命令在各节点之间上传jar包。
scp -r [本服务器存放jar包路径] root@ecs实例私网IP:[接收的服务器存放jar包地址]
- 运行代码。
- 运行模式。
- Local模式。
- 使用Local模式运行的命令语法如下。
./bin/spark-submit \
--master local \
--jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
--class DataReaderTest \
${jar-path} \
${maxcompute-project-name} \
${aliyun-access-key-id} \
${aliyun-access-key-secret} \
${maxcompute-table-name}
- 参数说明如下。
参数 |
说明 |
master |
运行模式,取值如下。
- Local:运行代码只调用当前ECS的计算资源。
- Yarn:运行代码使用EMR集群所有ECS的计算资源,运行效率比Local模式高。
|
jars |
依赖的jar包路径。 |
class |
需要执行的类名称。 |
jar-path |
需要执行的jar包路径。 |
maxcompute-project-name |
MaxCompute的项目(Project)名称。 |
aliyun-access-key-id |
阿里云账号或RAM用户的AccessKey ID。
您可以进入AccessKey管理页面获取AccessKey ID。
|
aliyun-access-key-secret |
AccessKey ID对应的AccessKey Secret。
您可以进入AccessKey管理页面获取AccessKey Secret。
|
maxcompute-table-name |
进行读或写的MaxCompute表名称。 |
- Yarn模式。
- 使用yarn模式运行的命令语法如下。
val ODPS_ENDPOINT = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api"
./bin/spark-submit \
--master yarn \
--jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
--class DataReaderTest \
${jar-path} \
${maxcompute-project-name} \
${aliyun-access-key-id} \
${aliyun-access-key-secret} \
${maxcompute-table-name}
- 参数说明如下。
参数 |
说明 |
master |
运行模式,取值如下。
- Local:运行代码只调用当前ECS的计算资源。
- Yarn:运行代码使用EMR集群所有ECS的计算资源,运行效率比Local模式高。
|
jars |
依赖的jar包路径。 |
class |
需要执行的类名称。 |
jar-path |
需要执行的jar包路径。 |
maxcompute-project-name |
MaxCompute的项目(Project)名称。 |
aliyun-access-key-id |
阿里云账号或RAM用户的AccessKey ID。
您可以进入AccessKey管理页面获取AccessKey ID。
|
aliyun-access-key-secret |
AccessKey ID对应的AccessKey Secret。
您可以进入AccessKey管理页面获取AccessKey Secret。
|
maxcompute-table-name |
进行读或写的MaxCompute表名称。 |
- 读非分区表示例。
- 命令语法如下。
-- 进入spark执行环境
cd /usr/lib/spark-current
-- 提交任务
./bin/spark-submit \
--master local \
--jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
--class DataReaderTest \
${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
${maxcompute-project-name} \
${aliyun-access-key-id} \
${aliyun-access-key-secret} \
${maxcompute-table-name}
- 执行界面如下。
- 执行结果如下。
- 读分区表示例。
- 命令语法如下。
-- 进入spark执行环境
cd /usr/lib/spark-current
-- 提交任务
./bin/spark-submit \
--master local \
--jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
--class PartitionDataReaderTest \
${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
${maxcompute-project-name} \
${aliyun-access-key-id} \
${aliyun-access-key-secret} \
${maxcompute-table-name} \
${partition-descripion}
- 执行界面如下。
- 执行结果如下。
- 写非分区表测试。
- 命令语法如下。
./bin/spark-submit \
--master local \
--jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
--class DataWriterTest \
${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
${maxcompute-project-name} \
${aliyun-access-key-id} \
${aliyun-access-key-secret} \
${maxcompute-table-name}
- 执行界面如下。
- 执行结果如下。
- 写分区表测试。
- 命令语法如下。
./bin/spark-submit \
--master local \
--jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
--class PartitionDataWriterTest \
${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
${maxcompute-project-name} \
${aliyun-access-key-id} \
${aliyun-access-key-secret} \
${maxcompute-table-name} \
${partition-descripion}
- 执行界面如下。
- 执行结果如下。
性能测试
本实践性能测试环境是E-MapReduce和MaxCompute,属于云上互联。如果IDC网络与云上相连性能取决于tunnel资源或者专线带宽。
- 实例规格。
实例 |
规格 |
E-MapReduce集群 |
- Master节点数量:2个。
- ECS规格:计算型(ecs.c6.2xlarge)8 vCPU,16 GiB,2.5 Gbps。
- 系统盘:ESSD云盘 120GiB。
- 数据盘:ESSD云盘 80GiB。
- Core节点数量:2个。
- ECS规格:计算型(ecs.c6.2xlarge)8 vCPU,16 GiB,2.5 Gbps。
- 系统盘:ESSD云盘 120GiB。
- 数据盘:ESSD云盘 80GiB * 4。
|
MaxCompute |
按量计费标准版。 |
- 大表读测试。
数据表规格如下。
参数 |
规格 |
表名称 |
dwd_product_movie_basic_info
说明 此表为MaxCompute公开数据集 MAXCOMPUTE_PUBLIC_DATA项目下的表,详情请参见 公开数据集。
|
表大小 |
4829258484 Byte。 |
分区数 |
593 个。 |
读取的分区名称 |
20170422。 |
结果如下。
耗时 0.850871秒。
- 大表写测试。
- 分区写入万条数据。耗时2.533892秒。
- 分区写入十万条数据。耗时8.441193秒。
- 分区写入百万条数据。耗时73.28秒。