Ganos Spark模块允许用户基于Apache Spark分布式系统进行大规模的地理信息数据处理与分析。它基于Spark环境提供了一系列的接口进行数据加载、分析和保存。Ganos Spark提供了不同级别的数据分析模型,最基础的是GeometryRDD模型,用来实现Ganos数据中SimpleFeature与Spark中RDD模型的之间的转换。在GeometryRDD基础上,Ganos Spark基于SparkSQL设计了一系列用于空间数据表达的UDT与UDF或UDAF,允许用户使用类似SQL结构化查询语言进行数据的查询与分析。Ganos Spark整体框架如下:

1. 获取Ganos Spark 工具包

首先请从此链接获取GanosSparkSDK开发包:Ganos Spark驱动

下载示例代码:代码示例

2 使用Ganos Spark查询HBase Ganos

用户可参考下面实例通过Ganos Spark连接HBase Ganos并查询数据:

package com.aliyun.ganos

import com.aliyun.ganos.spark.GanosSparkKryoRegistrator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object GanosSparkDemo {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com").setLevel(Level.ERROR)

    //指定HBase连接参数,POINT为Catalog名称
    val params = Map(
      "hbase.catalog" -> "POINT",
      "hbase.zookeepers" -> "zookeeper地址",
      "geotools" -> "true")

    //初始化SparkSession
    val sparkSession = SparkSession.builder
      .appName("Simple Application")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.crossJoin.enabled", "true")
      .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
      .master("local[*]")
      .getOrCreate()

    //加载AIS数据源
    val dataFrame = sparkSession.read
      .format("ganos")
      .options(params)
      .option("ganos.feature", "AIS")
      .load()

    //查询全部数据
    dataFrame.createOrReplaceTempView("ais")
    val r=sparkSession.sql("SELECT * FROM ais")
    r.show()

    //时空查询
    val r1=sparkSession.sql("SELECT * FROM ais WHERE st_contains(st_makeBBOX(70.00000,11.00000,75.00000,14.00000), geom)")
    r1.show()

    //将查询结果反写入HBase Ganos
    r1.write.format("ganos").options(params).option("ganos.feature", "result").save()
  }
}
			

运行结果如下:

关于Ganos Spark支持的空间操作函数,用户可参考:Ganos Spark 函数