全部产品
E-MapReduce

Spark + ODPS

更新时间:2017-06-07 13:26:11   分享:   

Spark + MaxCompute

Spark 接入 MaxCompute

本章节将介绍如何使用E-MapReduce SDK在Spark中完成一次MaxCompute数据的读写操作。

  1. 初始化一个OdpsOps对象。在 Spark 中,MaxCompute的数据操作通过OdpsOps类完成,请参照如下步骤创建一个OdpsOps对象:

    1. import com.aliyun.odps.TableSchema
    2. import com.aliyun.odps.data.Record
    3. import org.apache.spark.aliyun.odps.OdpsOps
    4. import org.apache.spark.{SparkContext, SparkConf}
    5. object Sample {
    6. def main(args: Array[String]): Unit = {
    7. // == Step-1 ==
    8. val accessKeyId = "<accessKeyId>"
    9. val accessKeySecret = "<accessKeySecret>"
    10. // 以内网地址为例
    11. val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com")
    12. val conf = new SparkConf().setAppName("Test Odps")
    13. val sc = new SparkContext(conf)
    14. val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
    15. // 下面是一些调用代码
    16. // == Step-2 ==
    17. ...
    18. // == Step-3 ==
    19. ...
    20. }
    21. // == Step-2 ==
    22. // 方法定义1
    23. // == Step-3 ==
    24. // 方法定义2
  2. 从MaxCompute中加载表数据到Spark中。通过OdpsOps对象的readTable方法,可以将MaxCompute中的表加载到Spark中,即生成一个RDD,如下所示:

    1. // == Step-2 ==
    2. val project = <odps-project>
    3. val table = <odps-table>
    4. val numPartitions = 2
    5. val inputData = odpsOps.readTable(project, table, read, numPartitions)
    6. inputData.top(10).foreach(println)
    7. // == Step-3 ==
    8. ...

    在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,如下所示:

    1. def read(record: Record, schema: TableSchema): String = {
    2. record.getString(0)
    3. }

    这个函数的含义是将MaxCompute表的第一列加载到Spark运行环境中。

  3. 将 Spark 中的结果数据保存到MaxCompute表中。通过OdpsOps对象的saveToTable方法,可以将Spark RDD持久化到MaxCompute中。

    1. val resultData = inputData.map(e => s"$e has been processed.")
    2. odpsOps.saveToTable(project, table, dataRDD, write)

    在上面的代码中,您还需要定义一个write函数,用作写MaxCompute表前数据预处理,如下所示:

    1. def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
    2. val r = emptyReord
    3. r.set(0, s)
    4. }

    这个函数的含义是将RDD的每一行数据写到对应MaxCompute表的第一列中。

  4. 分区表参数写法说明

SDK支持对MaxCompute分区表的读写,这里分区名的写法标准是:分区列名=分区名,多个分区时以逗号分隔,例如有分区列pt和ps:

  • 读分区pt为1的表数据:pt=‘1’
  • 读分区pt为1和分区ps为2的表数据:pt=‘1’,ps=‘2’

附录

示例代码请看:

本文导读目录
本文导读目录
以上内容是否对您有帮助?