访问Redis数据源

本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过ENI网络访问Redis数据。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建AnalyticDB for MySQL集群的数据库账号。

  • AnalyticDB for MySQL集群与Redis实例位于同一地域。具体操作,请参见步骤1:创建实例

  • 已将Redis实例添加到安全组中,且安全组规则的入方向与出方向放行Redis端口的访问请求。具体操作,请参见设置白名单添加安全组规则

  • 已开通OSS服务并创建存储空间。具体操作,请参见开通OSS服务创建存储空间

操作步骤

  1. 下载AnalyticDB for MySQL Spark访问Redis依赖的Jar包。下载链接,请参见spark-redisjediscommons-pool2

  2. 在pom.xml文件的dependencies中添加依赖项。

          <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis_2.12</artifactId>
                <version>3.1.0</version>
          </dependency>
          <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.9.0</version>
          </dependency>
          <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.11.1</version>
          </dependency>
  3. 编写如下示例程序来访问Redis,并进行编译打包。本文生成的Jar包名称为redis_test.jar。示例代码如下:

    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        var redisTableName = args(3)
    
       //Spark Conf中配置的Redis信息。
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        //样例数据。
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        //通过Dataset API写入数据。
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        //默认方式读取Redis的Hash值。
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        //设置infer.schema=true,Spark会检索Redis的Schema。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        //指定Schema的方式。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)
  4. 将步骤1下载的Jar包和示例程序redis_test.jar上传至OSS。具体操作,请参见简单上传

  5. 进入Spark Jar开发。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发Spark Jar 开发

  6. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  7. 在编辑器中输入以下作业内容。

    {
        "name": "<redis-example>",
        "file": "oss://<bucket_name>/redis_test.jar",
        "className": "com.aliyun.spark.<SparkOnRedis>",
        "jars": [
          "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar",
          "oss://<bucket_name>/jedis-3.9.0.jar",
          "oss://<bucket_name>/commons-pool2-2.11.1.jar"
        ],
        "args": [
          -- Redis实例的内网连接地址。在目标实例的实例信息页面的连接信息区域,可查看到各连接类型的地址和端口号。
          "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>",
          --Redis实例的端口号,固定为6379。
          "6379",
          -- Redis实例的数据库账号密码。
          "<your_password>",
          -- Redis实例的表名。
          "<redis_table_name>"
        ],
        "conf": {
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small"
        }
    }

    参数说明如下。

    参数

    说明

    name

    Spark作业名称。

    file

    Spark作业主文件的存储位置。主文件是入口类所在的Jar包或者Python的入口执行文件。

    说明

    Spark作业主文件目前只支持存储在OSS中。

    className

    Java或者Scala程序入口类名称。Python不需要指定入口类。

    args

    请根据业务需求,填写使用Jar包时需要的参数。多个参数之间以英文逗号(,)分隔。

    spark.adb.eni.enabled

    是否开启ENI访问。本文需要开启ENI访问。

    spark.adb.eni.vswitchId

    交换机ID。在目标Redis实例的实例信息页面获取交换机ID。

    spark.adb.eni.securityGroupId

    Redis实例中添加的安全组ID。如未添加安全组,请参见设置白名单

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。更多conf参数,请参见Conf配置参数

  8. 单击立即执行

  9. 应用列表中目标应用的状态为已完成,您可以单击操作列的日志查看Redis表的数据。