全部产品

与Spark集成分析

更新时间:2019-08-12 10:02:01

与Spark集成分析

Ganos Spark模块允许用户基于Apache Spark分布式系统进行大规模的地理信息数据处理与分析。它基于Spark环境提供了一系列的接口进行数据加载、分析和保存。Ganos Spark提供了不同级别的数据分析模型,最基础的是GeometryRDD模型,用来实现Ganos数据中SimpleFeature与Spark中RDD模型的之间的转换。在GeometryRDD基础上,Ganos Spark基于SparkSQL设计了一系列用于空间数据表达的UDT与UDF或UDAF,允许用户使用类似SQL结构化查询语言进行数据的查询与分析。Ganos Spark整体框架如下:

1. 初始化Ganos Spark环境

1.1 获取依赖包

首先请从此链接获取GanosSpark SDK开发包 下载GanosSpark SDK开发包

获取ganos-spark-runtime-1.0-SNAPSHOT.jar包后,在pom文件中增加依赖:

  1. <dependency>
  2. <groupId>com.aliyun.ganos</groupId>
  3. <artifactId>ganos-spark-runtime</artifactId>
  4. <version>1.0-SNAPSHOT</version>
  5. <scope>system</scope>
  6. <systemPath>/ganos-spark-runtime-1.0-SNAPSHOT.jar</systemPath>
  7. </dependency>

同时还需要hbase与hbase ganos相关依赖,具体可参考HBase Ganos教程。

1.2 连接HBase Ganos

首先需要初始化Spark环境,创建SparkContext对象:

  1. import com.aliyun.ganos.spark.{GanosSpark, GanosSparkKryoRegistrator}
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.geotools.data.simple.SimpleFeatureStore
  5. import org.geotools.data.{DataStore, DataStoreFinder, DataUtilities, Query}
  6. import org.geotools.factory.Hints
  7. import org.geotools.geometry.jts.JTSFactoryFinder
  8. import org.locationtech.jts.geom.Coordinate
  9. import org.opengis.feature.simple.SimpleFeature
  10. var sc: SparkContext = _
  11. val conf = new SparkConf().setMaster("local[2]").setAppName("testSpark")
  12. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  13. conf.set("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
  14. sc = SparkContext.getOrCreate(conf)

然后通过GanosSpark对象连接HBase Ganos并加载数据。连接HBase Ganos需要指定相关连接参数并保存为Map类型,具体参数如下:

  1. //连接HBase Ganos
  2. val dsParams = Map(
  3. "hbase.catalog" -> "AIS",
  4. "hbase.zookeepers" -> "localhost",
  5. "geotools" -> "true")
  6. val ds = DataStoreFinder.getDataStore(dsParams)
  7. //加载数据转换为RDD模型:
  8. val rdd = GanosSpark(dsParams).rdd(new Configuration(), sc, dsParams, new Query("...."))

上面代码会将用户配置的标准的Query对象下推到HBase Ganos并进行查询,并将查询结果返回Spark集群,最后转为RDD[SimpleFeature]数据类型。

3. 使用Spark SQL查询数据

3.1 GanosSpark SQL模型

Ganos Spark提供了
其中UDT模型包括:

  • GeometryUDT
  • PointUDT
  • LineStringUDT
  • PolygonUDT
  • MultiPointUDT
  • MultiLineStringUDT
  • MultiPolygonUDT
  • GeometryCollectionUDT

3.2 使用Spark SQL加载数据

加载Spark SQL运行环境:

  1. import scala.collection.JavaConversions._
  2. import org.apache.spark.sql.{SQLTypes, SparkSession}
  3. import com.aliyun.ganos.spark.GanosSparkKryoRegistrator
  4. implicit val displayer: String => Unit = { s => kernel.display.content("text/html", s) }
  5. //初始化Ganos Spark环境
  6. val params = Map("hbase.zookeepers"->"localhost","hbase.catalog"->"AIS")
  7. val ds = org.geotools.data.DataStoreFinder.getDataStore(params)
  8. val sparkSession = SparkSession.builder
  9. .appName("Simple Application")
  10. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  11. .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
  12. .master("local[*]")
  13. .getOrCreate()
  14. SQLTypes.init(spark.sqlContext)

3.3 UDF时空算子

成功创建构建SparkSession对象后,就可以通过read方法加载数据为DataFrame对象了。Ganos Spark提供了一系列UDF时空算子实现基于SQL的时空数据查询,详细介绍请参考:https://yuque.antfin-inc.com/adb-spatial/fx8m4k/oolqhw

示例1:加载DataFrame对象

  1. 如需要进行空间过滤:
  2. // 读取Ganos轨迹数据生成DataFrame
  3. val dataFrame = sparkSession.read
  4. .format("ganos")
  5. .options(params)
  6. .option("ganos.feature", "POINT")
  7. .load()
  8. dataFrame.createOrReplaceTempView("aispoint");
  9. println(dataFrame.schema)

示例2:使用SQL UDF时空查询:

  1. //船舶轨迹数据时空查询
  2. val spatialDf1 = spark.sql(
  3. """
  4. |SELECT ship_id,status,dtg,geom FROM aispoint
  5. |WHERE st_contains(st_makeBBOX(114.00000,22.00000,115.00000,23.00000), geom)
  6. |AND dtg between cast('2018-09-08T01:00:00Z' as timestamp) AND cast('2018-09-13T01:00:00Z' as timestamp)
  7. """.stripMargin)
  8. println("查询录数:" + spatialDf1.count())
  9. spatialDf1.createOrReplaceTempView("point")
  10. spatialDf1.show

输出结果如下图所示:

  1. 查询录数:4291
  2. +---------+------+-------------------+--------------------+
  3. | ship_id|status| dtg| geom|
  4. +---------+------+-------------------+--------------------+
  5. |566652000| 机动船在航|2018-09-08 09:04:00|POINT (114.749211...|
  6. |566652000| 机动船在航|2018-09-08 09:05:19|POINT (114.74888 ...|
  7. |566652000| 机动船在航|2018-09-08 09:08:00|POINT (114.74858 ...|
  8. |413280000| 锚泊|2018-09-08 09:11:39|POINT (114.710655...|
  9. |566652000| 机动船在航|2018-09-08 09:21:20|POINT (114.746276...|
  10. |413280000| 锚泊|2018-09-08 09:20:38|POINT (114.710836...|
  11. |566652000| 机动船在航|2018-09-08 09:27:01|POINT (114.747328...|
  12. |413280000| 锚泊|2018-09-08 09:32:37|POINT (114.710993...|
  13. |566652000| 机动船在航|2018-09-08 09:41:01|POINT (114.7508 2...|
  14. |413280000| 锚泊|2018-09-08 09:38:38|POINT (114.710961...|
  15. |566652000| 机动船在航|2018-09-08 09:42:50|POINT (114.751018...|
  16. |566652000| 机动船在航|2018-09-08 09:48:20|POINT (114.750356...|
  17. |566652000| 机动船在航|2018-09-08 09:50:10|POINT (114.750155...|
  18. |566652000| 机动船在航|2018-09-08 09:52:00|POINT (114.749886...|
  19. |413280000| 锚泊|2018-09-08 09:56:38|POINT (114.711043...|
  20. |566652000| 机动船在航|2018-09-08 10:04:01|POINT (114.748488...|
  21. |566652000| 机动船在航|2018-09-08 10:07:50|POINT (114.747945...|
  22. |566652000| 机动船在航|2018-09-08 10:10:19|POINT (114.7476 2...|
  23. |566652000| 机动船在航|2018-09-08 10:12:09|POINT (114.74736 ...|
  24. |413280000| 锚泊|2018-09-08 10:05:38|POINT (114.71118 ...|
  25. +---------+------+-------------------+--------------------+
  26. only showing top 20 rows

4. 数据保存

用户可以通过DataFrame的write接口将分析结果保存在HBase Ganos中的result表中。注意,result表一定要在写入前已经创建,否则下面程序将报错。

  1. //通过Ganos Spark Connector将查询结果反写入Ganos
  2. spatialDf1.write.format("ganos").options(params).option("ganos.feature", "result").save()