访问Tablestore数据源

本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过访问Tablestore数据。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已授权AnalyticDB for MySQL扮演AliyunADBSparkProcessingDataRole角色来访问其他云资源。具体操作,请参见账号授权

  • AnalyticDB for MySQL集群与TableStore实例位于同一地域。

操作步骤

  1. 下载AnalyticDB for MySQLSpark访问Tablestore依赖的Jar包。下载链接emr-tablestore-2.3.0-SNAPSHOT.jartablestore-5.10.3-jar-with-dependencies.jar

  2. 在pom.xml文件的dependencies中添加依赖项。

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-tablestore</artifactId>
  <version>2.2.0</version>
  <scope>provided</scope>
</dependency>
  1. 编写如下示例程序来访问Tablestore,并进行编译打包。本文生成的Jar包名称为spark-tablestore.jar。示例代码如下:

package com.aliyun.spark

import com.aliyun.openservices.tablestore.hadoop.TableStore
import org.apache.spark.sql.SparkSession
import scala.collection.mutable

object SparkOnTablestore {
  def main(args: Array[String]): Unit = {
    //Tablestore实例的VPC连接地址。
    val endpoint = args(0)
    // Tablestore的表名。
    val tableName = args(1)
    // Tablestore实例名称。
    val instanceName = args(2)
    // 具备Tablestore读写权限的RAM用户的AccessKey ID。
    val accessKeyId = sys.env.get("ACCESS_KEY_ID")
    //  具备Tablestore读写权限的RAM用户的AccessKey Secret。
    val accessKeySecret = sys.env.get("ACCESS_KEY_SECRET")
    // Tablestore表的数据目录。
    val dataCatalog = 
      """
        |{"columns":{
        |    "PkString": {"type":"string"},
        |    "PkInt": {"type":"long"},
        |    "col1": {"type":"string"},
        |    "col2": {"type":"long"},
        |    "col3": {"type":"binary"},
        |    "timestamp": {"type":"long"},
        |    "col5": {"type":"double"},
        |    "col6": {"type":"boolean"}
        |  }
        |}
      """.stripMargin
    
    val options = new mutable.HashMap[String, String]()
    options.put("endpoint", endpoint)
    options.put("access.key.id", accessKeyId.get)
    options.put("access.key.secret", accessKeySecret.get)
    options.put("table.name", tableName)
    options.put("instance.name", instanceName)
    options.put("catalog", dataCatalog)

    val spark = SparkSession.builder().getOrCreate()
    // 读取数据。
    val df = spark.read.format("tablestore").options(options).load()
    df.show()
    // 写入数据。
    df.write.format("tablestore").options(options).save()

    TableStore.shutdown()
    spark.stop()
  }
}
  1. 将步骤1和步骤3中的Jar包上传至OSS。具体操作,请参见简单上传

  2. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  3. 在左侧导航栏,单击作业开发 > Spark Jar开发

  4. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  5. 在编辑器中输入以下作业内容。

{
  "args": [
    "https://i01164l****.<region_id>.vpc.tablestore.aliyuncs.com",
    "spark_test",
    "i01164l****"
  ],
  "file": "oss://<bucket_name>/spark-tablestore.jar",
  "jars": [
    "oss://<bucket_name>/emr-tablestore-2.3.0-SNAPSHOT.jar",
    "oss://<bucket_name>/tablestore-5.10.3-jar-with-dependencies.jar"
  ],
  "name": "spark-on-tablestore",
  "className": "com.aliyun.spark.SparkOnTablestore",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.executor.resourceSpec": "medium",
    "spark.kubernetes.driverEnv.ACCESS_KEY_ID": "LTAI5tLBmYLvf****",
    "spark.kubernetes.driverEnv.ACCESS_KEY_SECRET": "A4KuIceJjlARWKzA6nVbbi****",
    "spark.hadoopRDD.ignoreEmptySplits": false
  }
}

参数说明如下。

参数

说明

args

请根据业务需求,填写使用Jar包时需要的参数。多个参数之间以英文逗号(,)分隔。

file

示例程序spark-tablestore.jar所在的OSS路径。

jars

Spark作业依赖的Jar包所在的OSS路径。

name

Spark作业名称。

className

Java或者Scala程序入口类名称。Python不需要指定入口类。

spark.kubernetes.driverEnv.ACCESS_KEY_ID

开源Spark配置项,访问Tablestore实例的AccessKey ID。

spark.kubernetes.driverEnv.ACCESS_KEY_SECRET

开源Spark配置项,访问Tablestore实例的AccessKey Secret。

spark.hadoopRDD.ignoreEmptySplits

开源Spark配置项,用于控制是否忽略空分区。

重要

访问Tablestore实例必须将参数设置为false。

conf其他参数

与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。更多conf参数,请参见Spark应用配置参数说明

  1. 单击立即执行

  2. 应用列表中目标应用的状态为已完成,您可以单击操作列的日志查看Tablestore表的数据。