阿里云数据库Redis版是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,可充分满足高吞吐、低延迟及弹性变配的业务需求。本文主要介绍如何通过DLA Serverless Spark访问云数据库Redis。

前提条件

  • 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参考开通OSS服务
  • 已经创建云数据库Redis实例。具体请参考步骤1:创建实例
  • 准备DLA Spark访问Redis实例所需的安全组ID和交换机ID。具体操作请参见配置数据源网络
  • DLA Spark访问Redis实例所需的安全组ID或交换机ID,已添加到Redis实例的白名单中。具体操作请参见步骤2:设置白名单

操作步骤

  1. 准备以下测试代码和依赖包来访问Redis,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
    测试代码示例:
    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        //获取Redis的, redisHost:内网连接地址(host),redisPort:端口号(port),redisPassword:连接密码。
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        //redis侧的表名。
        var redisTableName = args(3)
    
        //spark conf中配置的redis信息。
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        //样例数据。
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        //通过dataset API写入数据。
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        //默认方式读取redis的hash值。
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        //设置infer.schema=true,spark会检索redis的Schema。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          //        .option("table", redisTableName)
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        //指定Schema的方式。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)
    Redis依赖的pom文件:
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.0</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>jedis</artifactId>
                <version>3.0.0-m1</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis</artifactId>
                <version>2.3.1-m3</version>
            </dependency>
  2. 登录Data Lake Analytics管理控制台
  3. 在页面左上角,选择Redis实例所在地域。
  4. 单击左侧导航栏中的Serverless Spark > 作业管理
  5. 作业编辑页面,单击创建作业
  6. 创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
    3
  7. 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
    {
        "args": [
            "r-xxx1.redis.rds.aliyuncs.com",  #Redis数据库“连接信息”的“内网连接地址(host)。
            "6379",  #Redis数据库“连接信息”的“端口号(port)”。
            "xxx2",  #Redis数据库登录密码。
            "spark-test"  #Redis数据库的表名。
        ],
        "file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar",  #存放测试软件包的OSS路径。
        "name": "redis-test",
        "jars": [
            "oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar",  #存放测试软件依赖包的OSS路径。
            "oss://spark_test/jars/redis/commons-pool2-2.0.jar",  #存放测试软件依赖包的OSS路径。
            "oss://spark_test/jars/redis/jedis-3.0.0-m1.jar"  #存放测试软件依赖包的OSS路径。
        ],
        "className": "com.aliyun.spark.SparkOnRedis",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-xxx",  #可访问Redis的交换机id。
            "spark.dla.eni.security.group.id": "sg-xxx"  #可访问Redis的安全组id。
        }
    }

执行结果

作业运行成功后,在任务列表中单击操作 > 日志,查看作业日志。出现如下日志说明作业运行成功:日志详情