本文介绍了如何使用DLA Spark 访问Tablestore。
操作步骤
- 准备以下测试代码来连接Tablestore,并将测试代码打包成JAR包上传至您的OSS。
package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.mutable //批计算谓词下推配置 object SparkTablestore { def main(args: Array[String]): Unit = { if (args.length < 5) { System.err.println( """Usage: SparkTablestore <instance-id> <table-name> <实例访问地址-VPC> <ACCESS_KEY_ID> | <ACCESS_KEY_SECRET> """.stripMargin) System.exit(1) } val instanceId = args(0) val tableName = args(1) val endpoint = args(2) val accessKeyId = args(3) val accessKeySecret = args(4) //表结构,您需要在Tablestore中准备具有如下表结构的表。 val catalog = """ |{ | "columns": { | "id":{ | "col":"id", | "type":"string" | }, | "company": { | "col": "company", | "type": "string" | }, | "age": { | "col": "age", | "type": "integer" | }, | | "name": { | "col": "name", | "type": "string" | } | } |} |""".stripMargin val sparkConf = new SparkConf val options = new mutable.HashMap[String, String]() //您的Tablestore实例名称。 options.put("instance.name", instanceId) //您想要连接的表名。 options.put("table.name", tableName) //您的访问的endpoint options.put("endpoint", endpoint) //您的ACCESS KEY。 options.put("access.key.id", accessKeyId) //您的ACCESS KEY SECRET。 options.put("access.key.secret", accessKeySecret) //您的表格结构,使用JSON表达式。 options.put("catalog", catalog) //与Long类型做Range( >= > < <= )比较的谓词是否下推。 options.put("push.down.range.long", "true") //与String类型做Range( >= > < <= )比较的谓词是否下推。 options.put("push.down.range.string", "true") // Tablestore通道Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。 options.put("maxOffsetsPerChannel", "10000") //多元索引名,可选。 //options.put("search.index.name", "<index_name>") //tunnel id,可选。 //options.put("tunnel.id", "<tunnel id>") val spark = SparkSession.builder.config(sparkConf).appName("Serverless Spark Tablestore Demo").getOrCreate import spark.implicits._ val dfReader = spark.read.format("Tablestore").options(options) val df = dfReader.load() //显示表内容。 df.show() df.printSchema() //写表。 val schema = StructType.apply(Seq(StructField("id", StringType), StructField("company", StringType),StructField("age", IntegerType), StructField("name", StringType))) val newData = spark.sparkContext.parallelize(Seq(("1","ant",10,"xxx"))).map(row => Row(row._1, row._2, row._3, row._4)) val newDataDF = spark.createDataFrame(newData, schema) newDataDF.write.format("Tablestore").options(options).save val dfReader1 = spark.read.format("Tablestore").options(options) val df1 = dfReader1.load() df1.show() } }
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Tablestore所在地域。
- 单击左侧导航栏中的Serverless Spark > 作业管理。
- 在作业编辑页面,单击创建作业模板。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下Spark Streaming任务内容。保存并提交Spark作业。
{ "args": [ "<instanceId>", "<tableName>", "<endpoint>", "<access key id>", "<access key secret>" ], "name": "Tablestore", "className": "com.aliyun.spark.SparkTablestore", "conf": { "spark.driver.resourceSpec": "medium", "spark.dla.connectors": "oss", "spark.executor.instances": 1, "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>", "spark.executor.resourceSpec": "medium" }, "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar" }
上述代码中出现的参数说明如下。
参数名称 说明 instanceId Tablestore的实例名称。 tableName Tablestore的表名。 endpoint Tablestore的endpoint,可在Tablestore控制台上查看,选择其中的VPC访问地址。 access key id 访问Tablestore所需的AccessKey ID。 access key secret 访问Tablestore所需的AccessKey Secret。 spark.dla.job.log.oss.uri 存放Spark日志的路径。