阿里云数据库Redis版是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,可充分满足高吞吐、低延迟及弹性变配的业务需求。本文主要介绍如何通过DLA
Serverless Spark访问云数据库Redis。
前提条件
- 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参考开通OSS服务。
- 已经创建云数据库Redis实例。具体请参考步骤1:创建实例。
- 准备DLA Spark访问Redis实例所需的安全组ID和交换机ID。具体操作请参见配置数据源网络。
- DLA Spark访问Redis实例所需的安全组ID或交换机ID,已添加到Redis实例的白名单中。具体操作请参见步骤2:设置白名单。
操作步骤
- 准备以下测试代码和依赖包来访问Redis,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
object SparkOnRedis {
def main(args: Array[String]): Unit = {
//获取Redis的, redisHost:内网连接地址(host),redisPort:端口号(port),redisPassword:连接密码。
val redisHost = args(0)
val redisPort = args(1)
val redisPassword = args(2)
//redis侧的表名。
var redisTableName = args(3)
//spark conf中配置的redis信息。
val sparkConf = new SparkConf()
.set("spark.redis.host", redisHost)
.set("spark.redis.port", redisPort)
.set("spark.redis.auth", redisPassword)
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
//样例数据。
val data =
Seq(
Person("John", 30, "60 Wall Street", 150.5),
Person("Peter", 35, "110 Wall Street", 200.3)
)
//通过dataset API写入数据。
val dfw = sparkSession.createDataFrame(data)
dfw.write.format("org.apache.spark.sql.redis")
.option("model", "hash")
.option("table", redisTableName)
.save()
//默认方式读取redis的hash值。
var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
.option("table", redisTableName)
.load()
.cache()
loadedDf.show(10)
//设置infer.schema=true,spark会检索redis的Schema。
loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
// .option("table", redisTableName)
.option("keys.pattern", redisTableName + ":*")
.option("infer.schema", "true")
.load()
loadedDf.show(10)
//指定Schema的方式。
loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
.option("keys.pattern", redisTableName + ":*")
.schema(StructType(Array(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("address", StringType),
StructField("salary", DoubleType)
)))
.load()
loadedDf.show(10)
sparkSession.stop()
}
}
case class Person(name: String, age: Int, address: String, salary: Double)
Redis依赖的pom文件:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>jedis</artifactId>
<version>3.0.0-m1</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis</artifactId>
<version>2.3.1-m3</version>
</dependency>
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Redis实例所在地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{
"args": [
"r-xxx1.redis.rds.aliyuncs.com", #Redis数据库“连接信息”的“内网连接地址(host)。
"6379", #Redis数据库“连接信息”的“端口号(port)”。
"xxx2", #Redis数据库登录密码。
"spark-test" #Redis数据库的表名。
],
"file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar", #存放测试软件包的OSS路径。
"name": "redis-test",
"jars": [
"oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar", #存放测试软件依赖包的OSS路径。
"oss://spark_test/jars/redis/commons-pool2-2.0.jar", #存放测试软件依赖包的OSS路径。
"oss://spark_test/jars/redis/jedis-3.0.0-m1.jar" #存放测试软件依赖包的OSS路径。
],
"className": "com.aliyun.spark.SparkOnRedis",
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small",
"spark.dla.eni.enable": "true",
"spark.dla.eni.vswitch.id": "vsw-xxx", #可访问Redis的交换机id。
"spark.dla.eni.security.group.id": "sg-xxx" #可访问Redis的安全组id。
}
}
执行结果
作业运行成功后,在任务列表中单击,查看作业日志。出现如下日志说明作业运行成功: