现有湖仓一体架构是以MaxCompute为中心读写Hadoop集群数据,有些线下IDC场景,客户不愿意对公网暴露集群内部信息,需要从Hadoop集群发起访问云上的数据。本文以开源大数据开发平台E-MapReduce(云上Hadoop)方式模拟本地Hadoop集群,为您介绍如何读写MaxCompute数据。

背景信息

实践架构图如下所示。模拟IDC架构

准备开发环境

  • 准备E-MapReduce(EMR)环境。
    1. 购买EMR集群。

      详情请参见E-MapReduce快速入门

    2. 登录EMR集群。
      说明 登录E-MapReduce集群详情请参见登录集群

      本实践登录ECS实例进行操作,连接ECS实例请参见连接ECS实例

  • 准备本地IDEA。
    1. 安装IntelliJ IDEA。

      本实践在IntelliJ IDEA运行,需要安装IntelliJ IDEA,详情请参见Install IntelliJ IDEA

    2. 安装Maven。

      详情请参见安装Maven

    3. 创建Scala项目。
      1. 下载Scala插件。
        打开IDEA,选择File>Settings。在Settings对话框左侧导航栏单击Plugins,单击Scala后的InstallScala
      2. 安装Scala JDK

        详情请参见Install Scala on your computer

      3. 创建Scala项目
        在IDEA里新建项目,选择Scala>IDEA,即可创建Scala项目。SCALA项目
  • 准备MaxCompute数据
    1. 创建项目

      MaxCompute创建Project请参见创建MaxCompute项目

    2. 获取AccessKey

      您可以进入AccessKey管理页面获取AccessKey ID和AccessKey Secret。

    3. 获取Endpoint

      MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见Endpoint

    4. 创建Table

      本实践需准备分区表和非分区表供测试使用,创建表详情请参见创建表

读写MaxCompute数据

  1. 代码开发。
    本实践提供如下读非分区表代码开发示例。
    说明 读分区表、写非分区表和写分区表代码示例请参见PartitionDataReaderTest.scalaDataWriterTest.scalaPartitionDataWriterTest.scala,可以根据实际业务情况进行代码开发。
    /*
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * @author renxiang
      * @date 2021-12-20
      */
    object DataReaderTest {
    
      val ODPS_DATA_SOURCE = "org.apache.spark.sql.odps.datasource.DefaultSource"
      val ODPS_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api"
    
    
      def main(args: Array[String]): Unit = {
        val odpsProject = args(0)
        val odpsAkId = args(1)
        val odpsAkKey = args(2)
        val odpsTable = args(3)
    
        val spark = SparkSession
          .builder()
          .appName("odps-datasource-reader")
          .getOrCreate()
    
        import spark._
    
        val df = spark.read.format(ODPS_DATA_SOURCE)
          .option("spark.hadoop.odps.project.name", odpsProject)
          .option("spark.hadoop.odps.access.id", odpsAkId)
          .option("spark.hadoop.odps.access.key", odpsAkKey)
          .option("spark.hadoop.odps.end.point", ODPS_ENDPOINT)
          .option("spark.hadoop.odps.table.name", odpsTable)
          .load()
    
        df.createOrReplaceTempView("odps_table")
    
        println("select * from odps_table")
        val dfFullScan = sql("select * from odps_table")
        println(dfFullScan.count)
        dfFullScan.show(10)
    
        Thread.sleep(72*3600*1000)
      }
    }
  2. 代码打包和上传。
    1. Maven打包代码。
      1. 在IDEA的代码开发页面右边栏,单击Maven
      2. Maven对话框,双击Lifecycle目录下的package进行打包。
    2. 本地编译jar包。
      1. 进入Project目录。
        在系统的命令行执行窗口(例如Windows的cmd窗口)执行如下命令。
        cd ${project.dir}/spark-datasource-v3.1
      2. 使用mvn命令构建spark-datasource。
        mvn clean package jar:test-jar
      3. 查看target目录下是否有dependencies.jartests.jartarget目录
    3. 上传jar包至服务器。
      1. 在本地使用scp命令上传已经打包好的jar包和依赖的jar包至服务器,命令语法如下。
        scp <本地jar包路径> root@<ECS实例公网IP>:<服务器存放jar包路径>
        示例如下。
        scp D:\Project\emr_mc_1\spark-datasource-v3.1\target\spark-datasource-1.0-SNAPSHOT-tests.jar root@8.xx.xx.xx:/root/emr_mc
        上传jar包
      2. 查看jar包。
        在服务器emr_mc目录下使用ll命令查看jar包。查看jar包
      3. 使用如下命令在各节点之间上传jar包。
        scp -r [本服务器存放jar包路径] root@ecs实例私网IP:[接收的服务器存放jar包地址]
  3. 运行代码。
    • 运行模式。
      • Local模式。
        • 使用Local模式运行的命令语法如下。
          ./bin/spark-submit \
              --master local \
              --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
              --class DataReaderTest \
              ${jar-path} \
              ${maxcompute-project-name} \
              ${aliyun-access-key-id} \
              ${aliyun-access-key-secret} \
              ${maxcompute-table-name}
        • 参数说明如下。
          参数 说明
          master 运行模式,取值如下。
          • Local:运行代码只调用当前ECS的计算资源。
          • Yarn:运行代码使用EMR集群所有ECS的计算资源,运行效率比Local模式高。
          jars 依赖的jar包路径。
          class 需要执行的类名称。
          jar-path 需要执行的jar包路径。
          maxcompute-project-name MaxCompute的项目(Project)名称。
          aliyun-access-key-id 阿里云账号或RAM用户的AccessKey ID。

          您可以进入AccessKey管理页面获取AccessKey ID。

          aliyun-access-key-secret AccessKey ID对应的AccessKey Secret。

          您可以进入AccessKey管理页面获取AccessKey Secret。

          maxcompute-table-name 进行读或写的MaxCompute表名称。
      • Yarn模式。
        • 使用yarn模式运行的命令语法如下。
          val ODPS_ENDPOINT = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api"
          
          ./bin/spark-submit \
              --master yarn \
              --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
              --class DataReaderTest \
              ${jar-path} \
              ${maxcompute-project-name} \
              ${aliyun-access-key-id} \
              ${aliyun-access-key-secret} \
              ${maxcompute-table-name}
        • 参数说明如下。
          参数 说明
          master 运行模式,取值如下。
          • Local:运行代码只调用当前ECS的计算资源。
          • Yarn:运行代码使用EMR集群所有ECS的计算资源,运行效率比Local模式高。
          jars 依赖的jar包路径。
          class 需要执行的类名称。
          jar-path 需要执行的jar包路径。
          maxcompute-project-name MaxCompute的项目(Project)名称。
          aliyun-access-key-id 阿里云账号或RAM用户的AccessKey ID。

          您可以进入AccessKey管理页面获取AccessKey ID。

          aliyun-access-key-secret AccessKey ID对应的AccessKey Secret。

          您可以进入AccessKey管理页面获取AccessKey Secret。

          maxcompute-table-name 进行读或写的MaxCompute表名称。
    • 读非分区表示例。
      • 命令语法如下。
        -- 进入spark执行环境
        cd /usr/lib/spark-current
        
        -- 提交任务
        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class DataReaderTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name}
      • 执行界面如下。读非分区表界面
      • 执行结果如下。读非分区表执行结果
    • 读分区表示例。
      • 命令语法如下。
        -- 进入spark执行环境
        cd /usr/lib/spark-current
        
        -- 提交任务
        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class PartitionDataReaderTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name} \
            ${partition-descripion}
      • 执行界面如下。读分区表界面
      • 执行结果如下。读分区表结果
    • 写非分区表测试。
      • 命令语法如下。
        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class DataWriterTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name}
      • 执行界面如下。写非分区表界面
      • 执行结果如下。写非分区表结果
    • 写分区表测试。
      • 命令语法如下。
        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class PartitionDataWriterTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name} \
            ${partition-descripion}
      • 执行界面如下。写分区表界面
      • 执行结果如下。写分区表结果

性能测试

本实践性能测试环境是E-MapReduce和MaxCompute,属于云上互联。如果IDC网络与云上相连性能取决于tunnel资源或者专线带宽。
  • 实例规格。
    实例 规格
    E-MapReduce集群
    • Master节点数量:2个。
      • ECS规格:计算型(ecs.c6.2xlarge)8 vCPU,16 GiB,2.5 Gbps。
      • 系统盘:ESSD云盘 120GiB。
      • 数据盘:ESSD云盘 80GiB。
    • Core节点数量:2个。
      • ECS规格:计算型(ecs.c6.2xlarge)8 vCPU,16 GiB,2.5 Gbps。
      • 系统盘:ESSD云盘 120GiB。
      • 数据盘:ESSD云盘 80GiB * 4。
    MaxCompute 按量计费标准版。
  • 大表读测试。
    数据表规格如下。
    参数 规格
    表名称 dwd_product_movie_basic_info
    说明 此表为MaxCompute公开数据集MAXCOMPUTE_PUBLIC_DATA项目下的表,详情请参见公开数据集
    表大小 4829258484 Byte。
    分区数 593 个。
    读取的分区名称 20170422。
    结果如下。大表读结果耗时 0.850871秒。
  • 大表写测试。
    • 分区写入万条数据。分区写入万条数据耗时2.533892秒。
    • 分区写入十万条数据。分区写入十万条数据耗时8.441193秒。
    • 分区写入百万条数据。分区写入百万条数据耗时73.28秒。