本章节介绍如何使用 E-MapReduce SDK 在 Spark 中完成一次 MaxCompute 数据的读写操作。

Spark 接入 MaxCompute

  1. 初始化一个 OdpsOps 对象。在 Spark 中,MaxCompute 的数据操作通过 OdpsOps 类完成,请参照如下步骤创建一个 OdpsOps 对象:
    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Step-1 ==
             val accessKeyId = "<accessKeyId>"
             val accessKeySecret = "<accessKeySecret>"
             // 以内网地址为例
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // 下面是一些调用代码
             // == Step-2 ==
             ...
             // == Step-3 ==
             ...
           }
           // == Step-2 ==
           // 方法定义1
           // == Step-3 ==
           // 方法定义2
         }
  2. 从 MaxCompute 中加载表数据到 Spark 中。通过 OdpsOps 对象的 readTable 方法,可以将 MaxCompute 中的表加载到 Spark 中,即生成一个 RDD,如下所示:
    // == Step-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Step-3 ==
             ...
    在上面的代码中,您还需要定义一个 read 函数,用来解析和预处理 MaxCompute 表数据,如下所示:
    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }

    这个函数的含义是将 MaxCompute 表的第一列加载到 Spark 运行环境中。

  3. 将 Spark 中的结果数据保存到 MaxCompute 表中。通过 OdpsOps 对象的 saveToTable 方法,可以将 Spark RDD 持久化到 MaxCompute 中。
    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)
    在上面的代码中,您还需要定义一个 write 函数,用作写 MaxCompute 表前数据预处理,如下所示:
    def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
               val r = emptyReord
               r.set(0, s)
             }

    这个函数的含义是将 RDD 的每一行数据写到对应 MaxCompute 表的第一列中。

  4. 分区表参数写法说明

    SDK 支持对 MaxCompute 分区表的读写,这里分区名的写法标准是:分区列名=分区名,多个分区时以逗号分隔,例如有分区列 pt 和 ps:

    • 读分区 pt 为 1 的表数据:pt=‘1’
    • 读分区 pt 为 1 和分区 ps 为 2 的表数据:pt=‘1’,ps=‘2’

附录

示例代码请参见: