本文以OSS Parquet格式为例介绍如何通过Spark SQL读写OSS外表。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
已创建Job型资源组。具体操作,请参见新建资源组。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
步骤一:进入数据开发
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在SQLConsole窗口,选择Spark引擎和Job型资源组。
步骤二:创建外库与OSS外表
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行以下语句,创建外库。
CREATE DATABASE IF NOT exists test_db comment 'demo db' location 'oss://<bucket_name>/test' /*用于在该路径中创建表,请替换为自己的OSS路径。*/ WITH dbproperties(k1='v1', k2='v2');
说明您也可以在dbproperties中配置
'auto.create.location'='true'
自动创建OSS路径。如果未指定该参数并且OSS中不存在该路径,请先创建对应的OSS路径。执行以下语句,创建OSS外表。
-- 创建非分区表。 CREATE TABLE IF NOT exists test_db.test_tbl(id int, name string, age int) using parquet location 'oss://<bucket_name>/test/test_tbl/' tblproperties ('parquet.compress'='SNAPPY'); -- 创建分区表。 CREATE TABLE IF NOT exists test_db.test_tbl_partitioned(id int, name string, age int) using parquet partitioned by (location string) location 'oss://<bucket_name>/test/test_tbl_partitioned/' tblproperties ('parquet.compress'='SNAPPY');
重要OSS路径中的Bucket需与创建数据库所选的Bucket相同。
创建外表时选择的OSS路径需比创建数据库时选择的OSS路径至少多一层目录,且外表的路径需在数据库路径下。
可以在tblproperties中配置
'auto.create.location'='true'
自动创建OSS路径。如果未指定该参数并且OSS中不存在该路径,请先创建对应的OSS路径。
步骤三:写入OSS外表数据
AnalyticDB for MySQL默认不支持多个Spark作业同时写一张OSS外表的不同分区。如果需要多个Spark作业同时写一张OSS外表的不同分区,请添加spark.hadoop.fs.aliyun.oss.upload.basedir=oss://<bucket_name>/test/upload;
配置,且该配置中的OSS的Bucket需要与OSS外表所在的Bucket相同。多个Spark作业可以配置使用同一个OSS路径。
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行以下语句,写入数据。您可以选择以下任意一种方式向OSS外表中写入数据。
方式一:INSERT INTO写入
INSERT INTO test_db.test_tbl VALUES(1, 'adb', 10);
方式二:INSERT OVERWRITE全表写入
INSERT OVERWRITE test_db.test_tbl VALUES(2, 'spark', 10);
方式三:INSERT INTO静态分区写入
INSERT INTO TABLE test_db.test_tbl_partitioned PARTITION(location='hangzhou') VALUES(1, 'adb', 10);
方式四:INSERT OVERWRITE静态分区写入
INSERT OVERWRITE TABLE test_db.test_tbl_partitioned PARTITION(location='hangzhou') VALUES(1, 'adb', 10);
方式五:INSERT OVERWRITE动态分区覆盖写
INSERT OVERWRITE TABLE test_db.test_tbl_partitioned PARTITION(location) VALUES(1, 'adb', 10, 'beijing');
步骤四:查询数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息。
执行以下语句,查询OSS外表数据。
SELECT * FROM test_db.test_tbl;
SELECT * FROM test_db.test_tbl_partitioned;
(可选)步骤五:删除外表
测试完成后,您可以执行以下语句,删除表的元数据。
DROP TABLE if exists test_db.test_tbl;
DROP TABLE if exists test_db.test_tbl_partitioned;
只会删除表的元数据,不会删除OSS中的数据。