本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过访问Tablestore数据。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
已授权AnalyticDB for MySQL扮演AliyunADBSparkProcessingDataRole角色来访问其他云资源。具体操作,请参见账号授权。
AnalyticDB for MySQL集群与TableStore实例位于同一地域。
操作步骤
下载AnalyticDB for MySQLSpark访问Tablestore依赖的Jar包。下载链接emr-tablestore-2.3.0-SNAPSHOT.jar和tablestore-5.10.3-jar-with-dependencies.jar。
在pom.xml文件的dependencies中添加依赖项。
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-tablestore</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
编写如下示例程序来访问Tablestore,并进行编译打包。本文生成的Jar包名称为
spark-tablestore.jar
。示例代码如下:
package com.aliyun.spark
import com.aliyun.openservices.tablestore.hadoop.TableStore
import org.apache.spark.sql.SparkSession
import scala.collection.mutable
object SparkOnTablestore {
def main(args: Array[String]): Unit = {
//Tablestore实例的VPC连接地址。
val endpoint = args(0)
// Tablestore的表名。
val tableName = args(1)
// Tablestore实例名称。
val instanceName = args(2)
// 具备Tablestore读写权限的RAM用户的AccessKey ID。
val accessKeyId = sys.env.get("ACCESS_KEY_ID")
// 具备Tablestore读写权限的RAM用户的AccessKey Secret。
val accessKeySecret = sys.env.get("ACCESS_KEY_SECRET")
// Tablestore表的数据目录。
val dataCatalog =
"""
|{"columns":{
| "PkString": {"type":"string"},
| "PkInt": {"type":"long"},
| "col1": {"type":"string"},
| "col2": {"type":"long"},
| "col3": {"type":"binary"},
| "timestamp": {"type":"long"},
| "col5": {"type":"double"},
| "col6": {"type":"boolean"}
| }
|}
""".stripMargin
val options = new mutable.HashMap[String, String]()
options.put("endpoint", endpoint)
options.put("access.key.id", accessKeyId.get)
options.put("access.key.secret", accessKeySecret.get)
options.put("table.name", tableName)
options.put("instance.name", instanceName)
options.put("catalog", dataCatalog)
val spark = SparkSession.builder().getOrCreate()
// 读取数据。
val df = spark.read.format("tablestore").options(options).load()
df.show()
// 写入数据。
df.write.format("tablestore").options(options).save()
TableStore.shutdown()
spark.stop()
}
}
将步骤1和步骤3中的Jar包上传至OSS。具体操作,请参见简单上传。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。
在编辑器中输入以下作业内容。
{
"args": [
"https://i01164l****.<region_id>.vpc.tablestore.aliyuncs.com",
"spark_test",
"i01164l****"
],
"file": "oss://<bucket_name>/spark-tablestore.jar",
"jars": [
"oss://<bucket_name>/emr-tablestore-2.3.0-SNAPSHOT.jar",
"oss://<bucket_name>/tablestore-5.10.3-jar-with-dependencies.jar"
],
"name": "spark-on-tablestore",
"className": "com.aliyun.spark.SparkOnTablestore",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium",
"spark.kubernetes.driverEnv.ACCESS_KEY_ID": "LTAI5tLBmYLvf****",
"spark.kubernetes.driverEnv.ACCESS_KEY_SECRET": "A4KuIceJjlARWKzA6nVbbi****",
"spark.hadoopRDD.ignoreEmptySplits": false
}
}
参数说明如下。
参数 | 说明 |
| 请根据业务需求,填写使用Jar包时需要的参数。多个参数之间以英文逗号(,)分隔。 |
| 示例程序 |
| Spark作业依赖的Jar包所在的OSS路径。 |
| Spark作业名称。 |
| Java或者Scala程序入口类名称。Python不需要指定入口类。 |
| 开源Spark配置项,访问Tablestore实例的AccessKey ID。 |
| 开源Spark配置项,访问Tablestore实例的AccessKey Secret。 |
| 开源Spark配置项,用于控制是否忽略空分区。 重要 访问Tablestore实例必须将参数设置为false。 |
conf其他参数 | 与开源Spark中的配置项基本一致,参数格式为 |
单击立即执行。
待应用列表中目标应用的状态为已完成,您可以单击操作列的日志查看Tablestore表的数据。