本文介绍了如何使用DLA Spark 访问Tablestore。

前提条件

  • 已经创建了Spark虚拟集群。具体操作请参见创建虚拟集群
  • 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务

操作步骤

  1. 准备以下测试代码来连接Tablestore,并将测试代码打包成jar包上传至您的OSS。
    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    
    import scala.collection.mutable
    //批计算谓词下推配置
    object SparkTablestore {
      def main(args: Array[String]): Unit = {
        if (args.length < 5) {
          System.err.println(
            """Usage: SparkTablestore <instance-id> <table-name> <实例访问地址-VPC> <ACCESS_KEY_ID>
              |         <ACCESS_KEY_SECRET>
            """.stripMargin)
          System.exit(1)
        }
        val instanceId = args(0)
        val tableName = args(1)
        val endpoint = args(2)
        val accessKeyId = args(3)
        val accessKeySecret = args(4)
        //表结构,您需要在Tablestore中准备具有如下表结构的表。
        val catalog =
          """
            |{
            |  "columns": {
            |    "id":{
            |      "col":"id",
            |      "type":"string"
            |    },
            |     "company": {
            |      "col": "company",
            |      "type": "string"
            |    },
            |    "age": {
            |      "col": "age",
            |      "type": "integer"
            |    },
            |
            |    "name": {
            |      "col": "name",
            |      "type": "string"
            |    }
            |  }
            |}
            |""".stripMargin
        val sparkConf = new SparkConf
        val options = new mutable.HashMap[String, String]()
        //您的Tablestore实例名称。
        options.put("instance.name", instanceId)
        //您想要连接的表名。
        options.put("table.name", tableName)
        //您的访问的endpoint
        options.put("endpoint", endpoint)
        //您的ACCESS KEY。
        options.put("access.key.id", accessKeyId)
        //您的ACCESS KEY SECRET。
        options.put("access.key.secret", accessKeySecret)
        //您的表格结构,使用json表达式。
        options.put("catalog", catalog)
        //与Long类型做Range( >= > < <= )比较的谓词是否下推。
        options.put("push.down.range.long", "true")
        //与String类型做Range( >= > < <= )比较的谓词是否下推。
        options.put("push.down.range.string", "true")
        // Tablestore通道Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
        options.put("maxOffsetsPerChannel", "10000")
        //多元索引名,可选。
        //options.put("search.index.name", "<index_name>")
        //tunnel id,可选。
        //options.put("tunnel.id", "<tunnel id>")
        val spark = SparkSession.builder.config(sparkConf).appName("Serverless Spark Tablestore Demo").getOrCreate
        import spark.implicits._
        val dfReader = spark.read.format("Tablestore").options(options)
        val df = dfReader.load()
        //显示表内容。
        df.show()
        df.printSchema()
        //写表。
        val schema = StructType.apply(Seq(StructField("id", StringType), StructField("company", StringType),StructField("age", IntegerType), StructField("name", StringType)))
        val newData = spark.sparkContext.parallelize(Seq(("1","ant",10,"xxx"))).map(row => Row(row._1, row._2, row._3, row._4))
        val newDataDF = spark.createDataFrame(newData, schema)
        newDataDF.write.format("Tablestore").options(options).save
        val dfReader1 = spark.read.format("Tablestore").options(options)
        val df1 = dfReader1.load()
        df1.show()
      }
    }
  2. 登录Data Lake Analytics管理控制台
  3. 在页面左上角,选择Tablestore所在地域。
  4. 单击左侧导航栏中的Serverless Spark > 作业管理
  5. 作业编辑页面,单击创建作业
  6. 创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
    3
  7. 单击Spark作业名,在Spark作业编辑框中输入以下Spark Streaming任务内容。保存并提交Spark作业。
    {
        "args": [
            "<instanceId>", 
            "<tableName>",  
            "<endpoint>",  
            "<access key id>", 
            "<access key secret>"  
        ],
        "name": "Tablestore",
        "className": "com.aliyun.spark.SparkTablestore",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.dla.connectors": "oss",
            "spark.executor.instances": 1,
            "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>",  
            "spark.executor.resourceSpec": "medium"
        },
        "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
    }

    上述代码中出现的参数说明如下。

    参数名称 说明
    instanceId Tablestore的实例名称。
    tableName Tablestore的表名。
    endpoint Tablestore的endpoint,可在Tablestore控制台上查看,选择其中的VPC访问地址。TableStore选择vpc内网络
    access key id 访问Tablestore所需的AccessKey ID。
    access key secret 访问Tablestore所需的AccessKey Secret。
    spark.dla.job.log.oss.uri 存放Spark日志的路径。