Spark访问湖仓一体外部数据源

Spark on MaxCompute目前已支持访问湖仓一体外部数据源,若您想将数据处理作业的环境从Spark更换为MaxCompute,无需再迁移Spark作业数据到MaxCompute,可直接进行访问,从而降低使用成本。本文为您介绍使用MaxCompute访问外部数据源的示例。

访问基于Hadoop外部数据源的外部项目

  • MaxCompute SQL访问外部项目表

    -- hadoop_external_project 为外部项目,映射的是EMR的Hive数据库
    -- 访问非分区表
    SELECT * from hadoop_external_project.testtbl;
    -- 访问分区表
    SELECT * from hadoop_external_project.testtbl_par where b='20220914';
  • Spark on MaxCompute访问外部项目表

    -- 配置项
    -- 当前默认关闭对于外表和外部project的支持,需要手动打开
    spark.sql.odps.enableExternalTable=true
    spark.sql.odps.enableExternalProject=true;
    -- 指定spark版本
    spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0;
    
    -- 代码
    import org.apache.spark.sql.SparkSession
    
    object external_Project_ReadTableHadoop {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("external_TableL-on-MaxCompute")
          // 默认300秒,广播join时广播等待的时间。
          .config("spark.sql.broadcastTimeout", 20 * 60)
          //在严格模式下,用户必须指定至少一个静态分区,在非严格模式下,所有分区都允许是动态的。
          .config("odps.exec.dynamic.partition.mode", "nonstrict")
          .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com")
          .getOrCreate()
    
        // 访问外部项目ext_dlf_0713 ;
        print("=====show tables in hadoop_external_project6=====")
        spark.sql("show tables in hadoop_external_project6").show()
    
        // 读外部项目非分区表
        print("===============hadoop_external_project6.testtbl;================")
        spark.sql("desc extended hadoop_external_project6.testtbl").show()
        print("===============hadoop_external_project6.testtbl;================")
        spark.sql("SELECT * from hadoop_external_project6.testtbl").show()
    
        // 读外部项目分区表
        print("===============hadoop_external_project6.testtbl_par;================")
        spark.sql("desc extended hadoop_external_project6.testtbl_par").show()
        print("===============hadoop_external_project6.testtbl;================")
        spark.sql("SELECT * from hadoop_external_project6.testtbl_par where b='20220914'").show()
    
      }
    
    }

访问基于数据湖构建和对象存储OSS的外部项目

  • MaxCompute SQL访问外部项目表

    -- ext_dlf_0713为外部项目,映射的是DLF的数据库
    -- 访问非分区表
    SELECT * from ext_dlf_0713.tbl_oss1;
  • Spark on MaxCompute访问外部项目表

    -- 配置项
    -- 当前默认关闭对于外表和外部project的支持,需要手动打开
    spark.sql.odps.enableExternalTable=true;
    spark.sql.odps.enableExternalProject=true;
    -- 指定spark版本
    spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0;
    -- 如果是EMR产生的OSS location,增加该参数
    spark.hadoop.odps.oss.location.uri.style=emr;
    -- spark 访问oss时需要指定oss.endpoint
    spark.hadoop.odps.oss.endpoint=oss-cn-shanghai-internal.aliyuncs.com;
    -- 指定region
    spark.hadoop.odps.region.id=cn-shanghai;
    spark.hadoop.odps.oss.region.default=cn-shanghai;
    
    -- 代码
    import org.apache.spark.sql.{SaveMode, SparkSession}
    object external_Project_ReadTable {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("external_TableL-on-MaxCompute")
          // 默认300秒,广播join时广播等待的时间。
          .config("spark.sql.broadcastTimeout", 20 * 60)
          //在严格模式下,用户必须指定至少一个静态分区,在非严格模式下,所有分区都允许是动态的。
          .config("odps.exec.dynamic.partition.mode", "nonstrict")
          .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com")
          .getOrCreate()
    
        // 访问外部项目ext_dlf_0713 ;
        print("=====show tables in ext_dlf_0713=====")
        spark.sql("show tables in ext_dlf_0713").show()
    
        // 读外部项目非分区表
        print("===============ext_dlf_0713.tbl_oss1;================")
        spark.sql("desc extended ext_dlf_0713.tbl_oss1").show()
        print("===============ext_dlf_0713.tbl_oss1;================")
        spark.sql("SELECT * from ext_dlf_0713.tbl_oss1").show()
    
    
      }
    
    }