本文介绍了如何使用DLA Spark Streaming访问TableStore。
操作步骤
- 准备以下测试代码来连接TableStore,并将测试代码打包成jar包上传至您的OSS。
package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.mutable //https://help.aliyun.com/document_detail/187100.html?spm=a2c4g.11186623.2.26.25336939JXk48g#topic-1962927 object SparkTableStore { def main(args: Array[String]): Unit = { if (args.length < 5) { System.err.println( """Usage: SparkTableStore <instance-id> <table-name> <实例访问地址-VPC> <ACCESS_KEY_ID> | <ACCESS_KEY_SECRET> """.stripMargin) System.exit(1) } val instanceId = args(0) val tableName = args(1) val endpoint = args(2) val accessKeyId = args(3) val accessKeySecret = args(4) //表结构,您需要在tablestore中准备具有如下表结构的表。 val catalog = """ |{ | "columns": { | "id":{ | "col":"id", | "type":"string" | }, | "company": { | "col": "company", | "type": "string" | }, | "age": { | "col": "age", | "type": "integer" | }, | | "name": { | "col": "name", | "type": "string" | } | } |} |""".stripMargin val sparkConf = new SparkConf val options = new mutable.HashMap[String, String]() //您的tablestore实例名称。 options.put("instance.name", instanceId) //您想要连接的表名。 options.put("table.name", tableName) //您的访问的endpoint options.put("endpoint", endpoint) //您的ACCESS KEY。 options.put("access.key.id", accessKeyId) //您的ACCESS KEY SECRET。 options.put("access.key.secret", accessKeySecret) //您的表格结构,使用json表达式。 options.put("catalog", catalog) //与Long类型做Range( >= > < <= )比较的谓词是否下推。 options.put("push.down.range.long", "true") //与String类型做Range( >= > < <= )比较的谓词是否下推。 options.put("push.down.range.string", "true") // Tablestore通道Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。 options.put("maxOffsetsPerChannel", "10000") //多元索引名,可选。 //options.put("search.index.name", "<index_name>") //tunnel id,可选。 //options.put("tunnel.id", "<tunnel id>") val spark = SparkSession.builder.config(sparkConf).appName("Serverless Spark TableStore Demo").getOrCreate import spark.implicits._ val dfReader = spark.read.format("tablestore").options(options) val df = dfReader.load() //显示表内容。 df.show() df.printSchema() //写表。 val schema = StructType.apply(Seq(StructField("id", StringType), StructField("company", StringType),StructField("age", IntegerType), StructField("name", StringType))) val newData = spark.sparkContext.parallelize(Seq(("1","ant",10,"xxx"))).map(row => Row(row._1, row._2, row._3, row._4)) val newDataDF = spark.createDataFrame(newData, schema) newDataDF.write.format("tablestore").options(options).save val dfReader1 = spark.read.format("tablestore").options(options) val df1 = dfReader1.load() df1.show() } }
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Tablestore所在地域。
- 单击左侧导航栏中的Serverless Spark > 作业管理。
- 在作业编辑页面,单击创建作业。
- 在创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下Spark Streaming任务内容。保存并提交Spark作业。
{ "args": [ "<instanceId>", #tablestore的实例名称。 "<tableName>, #tablestore的表名。 "<endpoint>", #tablestore的endpoint,可在tablestore控制台上查看。 "<access key id>", #访问tablestore所需的accessKey Id。 "<access key secret>" #访问tablestore所需的accessKey Secret。 ], "name": "TableStore", "className": "com.aliyun.spark.SparkTableStore", "conf": { "spark.driver.resourceSpec": "medium", "spark.dla.connectors": "oss", "spark.executor.instances": 1, "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>", #存放Spark日志的路径。 "spark.executor.resourceSpec": "medium" }, "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar" }
在文档使用中是否遇到以下问题
更多建议
匿名提交