阿里云数据库Redis版是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,可充分满足高吞吐、低延迟及弹性变配的业务需求。本文主要介绍如何通过DLA Serverless Spark访问云数据库Redis。
重要
云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见访问Redis数据源。
前提条件
操作步骤
准备以下测试代码和依赖包来访问Redis,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object SparkOnRedis { def main(args: Array[String]): Unit = { //获取Redis的, redisHost:内网连接地址(host),redisPort:端口号(port),redisPassword:连接密码。 val redisHost = args(0) val redisPort = args(1) val redisPassword = args(2) //redis侧的表名。 var redisTableName = args(3) //spark conf中配置的redis信息。 val sparkConf = new SparkConf() .set("spark.redis.host", redisHost) .set("spark.redis.port", redisPort) .set("spark.redis.auth", redisPassword) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() //样例数据。 val data = Seq( Person("John", 30, "60 Wall Street", 150.5), Person("Peter", 35, "110 Wall Street", 200.3) ) //通过dataset API写入数据。 val dfw = sparkSession.createDataFrame(data) dfw.write.format("org.apache.spark.sql.redis") .option("model", "hash") .option("table", redisTableName) .save() //默认方式读取redis的hash值。 var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("table", redisTableName) .load() .cache() loadedDf.show(10) //设置infer.schema=true,spark会检索redis的Schema。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") // .option("table", redisTableName) .option("keys.pattern", redisTableName + ":*") .option("infer.schema", "true") .load() loadedDf.show(10) //指定Schema的方式。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .schema(StructType(Array( StructField("name", StringType), StructField("age", IntegerType), StructField("address", StringType), StructField("salary", DoubleType) ))) .load() loadedDf.show(10) sparkSession.stop() } } case class Person(name: String, age: Int, address: String, salary: Double)
Redis依赖的pom文件:
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>jedis</artifactId> <version>3.0.0-m1</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis</artifactId> <version>2.3.1-m3</version> </dependency>
在页面左上角,选择Redis实例所在地域。
单击左侧导航栏中的 。
在作业编辑页面,单击创建作业。
在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{ "args": [ "r-xxx1.redis.rds.aliyuncs.com", #Redis数据库“连接信息”的“内网连接地址(host)。 "6379", #Redis数据库“连接信息”的“端口号(port)”。 "xxx2", #Redis数据库登录密码。 "spark-test" #Redis数据库的表名。 ], "file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar", #存放测试软件包的OSS路径。 "name": "redis-test", "jars": [ "oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar", #存放测试软件依赖包的OSS路径。 "oss://spark_test/jars/redis/commons-pool2-2.0.jar", #存放测试软件依赖包的OSS路径。 "oss://spark_test/jars/redis/jedis-3.0.0-m1.jar" #存放测试软件依赖包的OSS路径。 ], "className": "com.aliyun.spark.SparkOnRedis", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-xxx", #可访问Redis的交换机id。 "spark.dla.eni.security.group.id": "sg-xxx" #可访问Redis的安全组id。 } }
执行结果
作业运行成功后,在任务列表中单击
,查看作业日志。出现如下日志说明作业运行成功:文档内容是否对您有帮助?