本文介绍如何使用Databricks 读写Redis数据。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建Redis实例,具体参见创建Redis实例。
已创建DDI集群,具体请参见DDI集群创建。
创建集群并通过Knox账号访问NoteBook。
目前暂不支持Spark3.0和DBR7及以上的版本读写Redis。
打通网络环境
进入DDI数据源点击添加选择通用网络打通,选择Redis数据库所在的VPC和vsw。
登录Redis控制台添加DDI集群各个机器IP至访问白名单,或者在VPC登录处设置允许VPC内免密访问。
使用Databricks 读写Redis数据
链接Redis数据库代码实现
%spark
import com.redislabs.provider.redis._
val redisServerDnsAddress = "your address"
val redisPortNumber = 6379
val redisPassword = "your password"
//获取RedisConfig
val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))
Redis数据库中相关String,List,Set,Hash的数据类型读写代码如下:
String 读写
%spark
//String 读写
val stringRDD = sc.parallelize(Seq(("000001", "Jack"), ("000002", "Rose")))
sc.toRedisKV(stringRDD)(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("000*", 5)(redisConfig)
val stringRDD2 = keysRDD.getKV
stringRDD2.collect().foreach(println)
List读写
%spark
//List 读写
val stringListRDD = sc.parallelize(Seq("dog", "cat", "pig"))
sc.toRedisLIST(stringListRDD, "animal")(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("animal*")(redisConfig)
val listRDD = keysRDD.getList
listRDD.collect().foreach(println)
HASH读写
%spark
//Hash读写
val stringHashRDD = sc.parallelize(Seq(("jack","22"), ("rose","23"),("sir","24"),("jack","24")))
sc.toRedisHASH(stringHashRDD, "message")(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("message*")(redisConfig)
val hashRDD = keysRDD.getHash
hashRDD.collect().foreach(println)
SET读写
%spark
//Set 读写
val stringHashRDD = sc.parallelize(Seq("hello", "hello", "word"))
sc.toRedisSET(stringHashRDD, "setKey")(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("setKey*")(redisConfig)
val setRDD = keysRDD.getSet
setRDD.collect().foreach(println)
文档内容是否对您有帮助?