批计算

使用SparkDataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。

前提条件

  • 了解Spark访问表格存储的依赖包,并在使用时通过Maven方式引入项目中。

    • Spark相关:spark-core、spark-sql、spark-hive

    • Spark Tablestore connector:emr-tablestore-<version>.jar

    • Tablestore Java SDK:tablestore-<version>-jar-with-dependencies.jar

    其中<version>表示相应依赖包的版本号,请以实际为准。

  • 已在表格存储侧创建数据表。具体操作,请参见创建数据表

  • 已获取AccessKey(包括AccessKey IDAccessKey Secret)。具体操作,请参见获取AccessKey

快速开始

通过项目样例了解快速使用批计算的操作。

  1. GitHub下载项目样例的源码,具体下载路径请参见TableStoreSparkDemo

    项目中包含完整的依赖和使用样例,具体的依赖请参见项目中的pom文件。

  2. 阅读TableStoreSparkDemo项目的README文档,并安装最新版的Spark Tablestore connectorTablestore Java SDK到本地Maven库。

  3. 修改Sample代码。

    TableStoreBatchSample为例,对此示例代码的核心代码说明如下:

    • format("tablestore")表示使用ServiceLoader方式加载Spark Tablestore connector,具体配置请参见项目中的META-INF.services。

    • instanceName、tableName、endpoint、accessKeyId、accessKeySecret分别表示表格存储的实例名称、数据表名称、实例endpoint、阿里云账号的AccessKey IDAccessKey Secret。

    • catalog是一个JSON串,包含字段名和类型,如下示例中的数据表有salt(Long类型)、UserId(String类型)、OrderId(String类型)、price(Double类型)和timestamp(Long类型)五个字段。

      最新版本中支持使用Schema方式替换catalog的配置,请根据实际选择。

    • split.size.mbs表示每个Split的切分大小,默认值为100,单位为MB,可不配置。

      此值越小产生的Split会越多,对应SparkTask也会越多。

          val df = sparkSession.read
          .format("tablestore")
          .option("instance.name", instanceName)
          .option("table.name", tableName)
          .option("endpoint", endpoint)
          .option("access.key.id", accessKeyId)
          .option("access.key.secret", accessKeySecret)
          .option("split.size.mbs", 100)
          .option("catalog", dataCatalog)
          // 最新版本支持使用Schema方式替换catalog的配置。
          //.schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG")
          .load()
      
        val dataCatalog: String =
          s"""
             |{"columns": {
             |    "salt": {"type":"long"},
             |    "UserId": {"type":"string"},
             |    "OrderId": {"type":"string"},
             |    "price": {"type":"double"},
             |    "timestamp": {"type":"long"}
             | }
             |}""".stripMargin
      

运行调试

根据需求修改示例代码后,可在本地或者通过Spark集群进行运行调试。以TableStoreBatchSample为例说明调试过程。

  • 本地调试

    IntelliJ IDEA为例说明。

    说明

    本文测试使用的环境为Spark 2.4.3、Scala 2.11.7Java SE Development Kit 8,如果使用中遇到问题,请联系表格存储技术支持。

    1. 在系统参数中,配置实例名称、数据表名称、实例endpoint、阿里云账号的AccessKey IDAccessKey Secret等参数。

      您也可以自定义参数的加载方式。

    2. 选择include dependencies with "provided" scope,单击OK

    3. 运行示例代码程序。

      idea_001
  • 通过Spark集群调试

    spark-submit方式为例说明。示例代码中的master默认为local[*],在Spark集群上运行时可以去掉,使用spark-submit参数传入。

    1. 执行mvn -U clean package命令打包,包的路径为target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

    2. 上传包到Spark集群的Driver节点,并使用spark-submit提交任务。

      spark-submit --class com.aliyun.tablestore.spark.demo.batch.TableStoreBatchSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar <ots-instanceName> <ots-tableName> <access-key-id> <access-key-secret> <ots-endpoint>
      fig_batch_dataframe001