Spark on MaxCompute目前已支持访问湖仓一体外部数据源,若您想将数据处理作业的环境从Spark更换为MaxCompute,无需再迁移Spark作业数据到MaxCompute,可直接进行访问,从而降低使用成本。本文为您介绍使用MaxCompute访问外部数据源的示例。
访问基于Hadoop外部数据源的外部项目
MaxCompute SQL访问外部项目表
-- hadoop_external_project 为外部项目,映射的是EMR的Hive数据库 -- 访问非分区表 SELECT * from hadoop_external_project.testtbl; -- 访问分区表 SELECT * from hadoop_external_project.testtbl_par where b='20220914';
Spark on MaxCompute访问外部项目表
-- 配置项 -- 当前默认关闭对于外表和外部project的支持,需要手动打开 spark.sql.odps.enableExternalTable=true spark.sql.odps.enableExternalProject=true; -- 指定spark版本 spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0; -- 代码 import org.apache.spark.sql.SparkSession object external_Project_ReadTableHadoop { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") // 默认300秒,广播join时广播等待的时间。 .config("spark.sql.broadcastTimeout", 20 * 60) //在严格模式下,用户必须指定至少一个静态分区,在非严格模式下,所有分区都允许是动态的。 .config("odps.exec.dynamic.partition.mode", "nonstrict") .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com") .getOrCreate() // 访问外部项目ext_dlf_0713 ; print("=====show tables in hadoop_external_project6=====") spark.sql("show tables in hadoop_external_project6").show() // 读外部项目非分区表 print("===============hadoop_external_project6.testtbl;================") spark.sql("desc extended hadoop_external_project6.testtbl").show() print("===============hadoop_external_project6.testtbl;================") spark.sql("SELECT * from hadoop_external_project6.testtbl").show() // 读外部项目分区表 print("===============hadoop_external_project6.testtbl_par;================") spark.sql("desc extended hadoop_external_project6.testtbl_par").show() print("===============hadoop_external_project6.testtbl;================") spark.sql("SELECT * from hadoop_external_project6.testtbl_par where b='20220914'").show() } }
访问基于数据湖构建和对象存储OSS的外部项目
MaxCompute SQL访问外部项目表
-- ext_dlf_0713为外部项目,映射的是DLF的数据库 -- 访问非分区表 SELECT * from ext_dlf_0713.tbl_oss1;
Spark on MaxCompute访问外部项目表
-- 配置项 -- 当前默认关闭对于外表和外部project的支持,需要手动打开 spark.sql.odps.enableExternalTable=true; spark.sql.odps.enableExternalProject=true; -- 指定spark版本 spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0; -- 如果是EMR产生的OSS location,增加该参数 spark.hadoop.odps.oss.location.uri.style=emr; -- spark 访问oss时需要指定oss.endpoint spark.hadoop.odps.oss.endpoint=oss-cn-shanghai-internal.aliyuncs.com; -- 指定region spark.hadoop.odps.region.id=cn-shanghai; spark.hadoop.odps.oss.region.default=cn-shanghai; -- 代码 import org.apache.spark.sql.{SaveMode, SparkSession} object external_Project_ReadTable { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") // 默认300秒,广播join时广播等待的时间。 .config("spark.sql.broadcastTimeout", 20 * 60) //在严格模式下,用户必须指定至少一个静态分区,在非严格模式下,所有分区都允许是动态的。 .config("odps.exec.dynamic.partition.mode", "nonstrict") .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com") .getOrCreate() // 访问外部项目ext_dlf_0713 ; print("=====show tables in ext_dlf_0713=====") spark.sql("show tables in ext_dlf_0713").show() // 读外部项目非分区表 print("===============ext_dlf_0713.tbl_oss1;================") spark.sql("desc extended ext_dlf_0713.tbl_oss1").show() print("===============ext_dlf_0713.tbl_oss1;================") spark.sql("SELECT * from ext_dlf_0713.tbl_oss1").show() } }
文档内容是否对您有帮助?