本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过ENI网络访问Redis数据。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
AnalyticDB for MySQL集群与Redis实例位于同一地域。具体操作,请参见步骤1:创建实例。
已将Redis实例添加到安全组中,且安全组规则的入方向与出方向放行Redis端口的访问请求。具体操作,请参见设置白名单和添加安全组规则。
操作步骤
下载AnalyticDB for MySQL Spark访问Redis依赖的Jar包。下载链接,请参见spark-redis、jedis和commons-pool2。
在pom.xml文件的dependencies中添加依赖项。
<dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency>
编写如下示例程序来访问Redis,并进行编译打包。本文生成的Jar包名称为
redis_test.jar
。示例代码如下:package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object SparkOnRedis { def main(args: Array[String]): Unit = { val redisHost = args(0) val redisPort = args(1) val redisPassword = args(2) var redisTableName = args(3) //Spark Conf中配置的Redis信息。 val sparkConf = new SparkConf() .set("spark.redis.host", redisHost) .set("spark.redis.port", redisPort) .set("spark.redis.auth", redisPassword) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() //样例数据。 val data = Seq( Person("John", 30, "60 Wall Street", 150.5), Person("Peter", 35, "110 Wall Street", 200.3) ) //通过Dataset API写入数据。 val dfw = sparkSession.createDataFrame(data) dfw.write.format("org.apache.spark.sql.redis") .option("model", "hash") .option("table", redisTableName) .save() //默认方式读取Redis的Hash值。 var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("table", redisTableName) .load() .cache() loadedDf.show(10) //设置infer.schema=true,Spark会检索Redis的Schema。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .option("infer.schema", "true") .load() loadedDf.show(10) //指定Schema的方式。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .schema(StructType(Array( StructField("name", StringType), StructField("age", IntegerType), StructField("address", StringType), StructField("salary", DoubleType) ))) .load() loadedDf.show(10) sparkSession.stop() } } case class Person(name: String, age: Int, address: String, salary: Double)
将步骤1下载的Jar包和示例程序
redis_test.jar
上传至OSS。具体操作,请参见简单上传。进入Spark Jar开发。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击作业开发> Spark Jar 开发。
在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。
在编辑器中输入以下作业内容。
{ "name": "<redis-example>", "file": "oss://<bucket_name>/redis_test.jar", "className": "com.aliyun.spark.<SparkOnRedis>", "jars": [ "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar", "oss://<bucket_name>/jedis-3.9.0.jar", "oss://<bucket_name>/commons-pool2-2.11.1.jar" ], "args": [ -- Redis实例的内网连接地址。在目标实例的实例信息页面的连接信息区域,可查看到各连接类型的地址和端口号。 "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>", --Redis实例的端口号,固定为6379。 "6379", -- Redis实例的数据库账号密码。 "<your_password>", -- Redis实例的表名。 "<redis_table_name>" ], "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
参数说明如下。
参数
说明
name
Spark作业名称。
file
Spark作业主文件的存储位置。主文件是入口类所在的Jar包或者Python的入口执行文件。
说明Spark作业主文件目前只支持存储在OSS中。
className
Java或者Scala程序入口类名称。Python不需要指定入口类。
args
请根据业务需求,填写使用Jar包时需要的参数。多个参数之间以英文逗号(,)分隔。
spark.adb.eni.enabled
是否开启ENI访问。本文需要开启ENI访问。
spark.adb.eni.vswitchId
交换机ID。在目标Redis实例的实例信息页面获取交换机ID。
spark.adb.eni.securityGroupId
Redis实例中添加的安全组ID。如未添加安全组,请参见设置白名单。
conf其他参数
与开源Spark中的配置项基本一致,参数格式为
key:value
形式,多个参数之间以英文逗号(,)分隔。更多conf参数,请参见Conf配置参数。单击立即执行。
待应用列表中目标应用的状态为已完成,您可以单击操作列的日志查看Redis表的数据。