通过Spark SQL读写C-Store数据

更新时间:

云原生数据仓库 AnalyticDB MySQL 版集群支持通过Spark SQL在线或离线读写C-Store表(即表引擎是XUANWU的表)数据。本文主要介绍通过Spark SQL在线或离线读写C-Store表数据的具体方法。

前提条件

  • 集群的产品系列为企业版、基础版或湖仓版

    说明
    • 湖仓版集群存储预留资源需大于0 ACU。

    • 企业版集群预留资源需大于0 ACU。

    • 基础版集群预留资源需大于0 ACU。

  • 已在企业版、基础版或湖仓版集群中创建Job型资源组

  • 已创建企业版、基础版或湖仓版集群的数据库账号。

步骤一:进入数据开发

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和Job型资源组。

步骤二:创建库和C-Store

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  1. 执行以下语句,创建数据库。

    CREATE DATABASE spark_create_adb_db_test;
  2. 执行以下语句,创建C-Store表。Spark SQL建表语法详情请参见Spark SQL创建内表

    CREATE TABLE spark_create_adb_db_test.test_adb_tbl (
      id int,
      name string COMMENT 'test_name',
      age int
    ) using adb TBLPROPERTIES('primaryKey'='id,age',
                    'distributeType'='HASH', 
                    'distributeColumns'='id', 
                    'partitionType'='value', 
                    'partitionColumn'='age',   
                    'partitionCount'='120',
                    'storagePolicy'='COLD'   
                   );

步骤三:读写C-Store表数据

说明
  • 您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  • 执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息

离线读写数据(INSERT OVERWRITE)时,仅需选择Spark引擎和Job型资源组,无需配置其他参数;在线读写数据(INSERT INTO)时,需通过SET命令设置执行SQL语句的资源组,且资源组类型必须为Interactive型。

离线读写数据(INSERT OVERWRITE)

注意事项

离线读写数据需注意以下内容:

  • 仅支持通过Spark SQL读取(SELECT)和写入(INSERT)C-Store分区表中的数据,不支持读写非分区表数据。分区表创建方法,详情请参见CREATE TABLE

  • 不支持通过Spark SQL更新(UPDATE)和删除(DELETE)C-Store表数据(包括分区表和非分区表)。

  • 查询热数据时,需要先修改ELASTIC_ENABLE_HOT_PARTITION_HAS_HDD_REPLICA配置项,并手动BUILD。待BUILD完成后,再执行SQL查询语句,否则会导致SQL执行失败。

    1. 使用XIHE引擎执行如下SQL,修改配置项。

      SET adb_config ELASTIC_ENABLE_HOT_PARTITION_HAS_HDD_REPLICA=true;
    2. 手动BUILD。

      • 对指定分区BUILD:

        重要

        3.1.6.0及以上版本的集群支持指定分区BUILD。

        云原生数据仓库AnalyticDB MySQL控制台集群信息页面的配置信息区域,查看和升级内核版本

        BUILD TABLE <table_name> force partitions='partition1,partition2';
      • 全表BUILD:

        重要

        该功能默认关闭,且强制全表BUILD时会对全表所有分区的数据重新构建索引,耗时较久。建议您使用指定分区BUILD功能若需使用强制全表BUILD功能,请谨慎评估系统风险后提交工单开启该功能。

        BUILD TABLE <table_name> force = true;
    3. 触发BUILD任务后,执行如下语句查看BUILD任务状态:

      SELECT table_name, schema_name, status FROM INFORMATION_SCHEMA.KEPLER_META_BUILD_TASK ORDER BY create_time DESC LIMIT 10;
  1. 写入C-Store表数据。

    AnalyticDB for MySQL支持通过INSERT OVERWRITE语句离线写入数据到C-Store表。您可以选择以下任意一种方式向C-Store表写入数据:

    • 方式一:INSERT OVERWRITE静态分区写入

      INSERT OVERWRITE spark_create_adb_db_test.test_adb_tbl partition(age=10) VALUES (1, 'bom');
    • 方式二:INSERT OVERWRITE动态分区写入

      INSERT OVERWRITE spark_create_adb_db_test.test_adb_tbl partition (age) VALUES (1, 'bom', 10);
  2. 读取C-Store表数据。

    SELECT * FROM spark_create_adb_db_test.test_adb_tbl;

在线读写数据(INSERT INTO)

AnalyticDB for MySQL支持通过INSERT INTO语句在线写入数据到C-Store表。您可以通过JDBC方式或View方式读写数据。

JDBC方式

-- 集群的数据库账号、密码以及内网地址
conf spark.adb.username=user;
conf spark.adb.password=password;
conf spark.adb.endpoint=amv-bp1a74zh****.ads.aliyuncs.com:3306;
-- 开启ENI访问
SET spark.adb.eni.enabled=true;
SET spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****;
SET spark.adb.eni.securityGroupId=sg-bp1cdm3acizrgq6x****;
-- 使用JDBC模式
SET spark.adb.useJdbc = true;
-- 设置执行SQL的在线资源组
SET spark.adb.resourceGroup=user_default;
-- 写入数据
INSERT INTO spark_create_adb_db_test.test_adb_tbl VALUES (1, 'adb', 20);
--读取数据
SELECT * FROM spark_create_adb_db_test.test_adb_tbl;

参数说明:

参数

说明

spark.adb.username

集群的数据库账号。

spark.adb.password

数据库账号的密码。

spark.adb.endpoint

集群的内网地址和端口。格式为amv-bp1a74zh****.ads.aliyuncs.com:3306

spark.adb.eni.enabled

开启ENI访问。

访问数据时,需将spark.adb.eni.enabled参数设置为true。

spark.adb.eni.vswitchId

集群所属的交换机ID。

spark.adb.eni.securityGroupId

集群的安全组ID。您可以选择已有安全组或新建安全组。

重要

安全组需和企业版、基础版或湖仓版集群属于同一VPC。

spark.adb.useJdbc

是否使用JDBC方式访问数据。取值:

  • true:是。

  • false(默认值):否。

在线写入数据时需配置为true

spark.adb.resourceGroup

设置执行SQL语句的Interactive型资源组。

说明

创建企业版、基础版或湖仓版集群时,会默认创建一个名为user_defaultInteractive型资源组,您可以选择该资源组或新建Interactive型资源组。新建资源组的具体方法,请参见新建和管理资源组

View方式

-- 开启ENI访问
SET spark.adb.eni.enabled=true;
SET spark.adb.eni.vswitchId=vsw-bp1sxxsodv28ey5dl****;   
SET spark.adb.eni.securityGroupId=sg-bp19mr685pmg4ihc****;    
-- 创建视图
CREATE TEMPORARY VIEW table_tmp
USING org.apache.spark.sql.jdbc
OPTIONS (
  url 'jdbc:mysql://amv-bp1a74zh****.ads.aliyuncs.com:3306/spark_create_adb_db_test?useServerPrepStmts=false&rewriteBatchedStatements=true',     
  dbtable 'spark_create_adb_db_test.test_adb_tbl',    
  user 'user',       
  password 'password****'      
);
-- 写入数据
INSERT INTO table_tmp VALUES (1, 'adb', 20);
-- 查询数据
SELECT * FROM table_tmp;

参数说明:

参数

说明

spark.adb.eni.enabled

开启ENI访问。

访问数据时,需将spark.adb.eni.enabled参数设置为true。

spark.adb.eni.vswitchId

集群所属的交换机ID。

spark.adb.eni.securityGroupId

集群的安全组ID。您可以选择已有安全组或新建安全组。

重要

安全组需和企业版、基础版或湖仓版集群属于同一VPC。

table_tmp

视图名称。本文以table_tmp为例。

USING org.apache.spark.sql.jdbc

参数取值固定为USING org.apache.spark.sql.jdbc

url

集群的JDBC URL。

格式:jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true,其中:

  • endpoint:集群的内网连接地址。

  • port:集群的端口号。固定为3306。

  • db_name:集群的数据库名称。

  • useServerPrepStmts=false&rewriteBatchedStatements=true:批量写入数据至的必填配置,用于提高写入性能,以及降低对集群的压力。

示例:jdbc:mysql://amv-bp1a74zh****.ads.aliyuncs.com:3306/spark_create_adb_db_test?useServerPrepStmts=false&rewriteBatchedStatements=true

dbtable

集群的表名。格式为db_name.table_name。本文以spark_create_adb_db_test.test_adb_tbl为例。

user

集群的数据库的账号。

password

集群数据库账号的密码。

Spark配置项

通过Spark SQL读写C-Store表时,支持配置如下配置项:

配置项

说明

默认值

spark.adb.write.batchSize

单批次写入数据的条数。取值范围为大于0的正整数。

说明

仅在线写入数据时,支持配置该参数。

600

spark.adb.write.arrow.maxMemoryBufferSize

写入最大内存缓冲大小。取值范围为大于0的正整数,单位为MB。

说明

仅离线写入数据时,支持配置该参数。

1024

spark.adb.write.arrow.maxRecordSizePerBatch

单批次写入数据的最大记录条数。取值范围为大于0的正整数。

说明

仅离线写入数据时,支持配置该参数。

500