本文主要介绍在DLA中如何使用时序时空引擎Ganos进行时空几何(Geometry)数据分析。

DLA Ganos中的时空几何,其范畴包含以下几个方面:
  • 时空几何对象
    • 矢量数据,如点、线、面状要素。
    • 在矢量数据基础上结合时间属性,组成的时空数据或时空轨迹数据。
  • 针对时空几何对象的相关操作,如空间关系判断(相交、相邻、包含等)。

DLA Ganos中矢量数据模型采用OGC协议标准中的SimpleFeature数据模型,其中的Geometry对象包含Point、LineString、Polygon、MultiPoint、MultiLineString和MultiPolygon等要素。Geometry以GeometryUDT的形式被Spark加载并参与运算。

Ganos目前支持的矢量数据源包括:
  • Hive
  • PolarDB
  • Lindorm(HBase)
  • GeoMesa
  • PostGIS

操作步骤

  1. 获取SDK。
  2. 搭建开发环境,并准备测试数据。

    获取SDK后,就可以开始部署开发环境并构建第一个Ganos Spark任务。首先,您需要准备测船载自动识别系统(AIS)样例数据。

    sid;date;longitude;latitude;course;speed;sourceid;heading;rot;messaged;status
    205003001;2018-09-03 09:15:50;72.96675666666667;11.982751666666667;130.0;13.1;17;129.0;0.0;1;在航
    205003001;2018-09-03 11:10:17;73.27778666666667;11.70024;133.0;13.1;17;131.0;0.0;1;在航
    205003001;2018-09-03 11:12:20;73.28349333333334;11.695146666666666;133.0;13.1;17;131.0;0.0;1;在航
    205003001;2018-09-03 12:45:25;73.54346666666666;11.469733333333334;134.0;13.2;17;133.0;0.0;1;在航
    205003001;2018-09-03 14:21:37;73.75408;11.182;144.8;14.0;17;145.0;0.0;1;在航
    205003001;2018-09-03 14:21:45;73.75434666666666;11.1816;144.8;14.0;17;145.0;0.0;1;在航
    205003001;2018-09-03 16:49:06;74.06773333333334;10.7524;146.0;12.8;17;143.0;0.0;1;在航
    205003001;2018-09-03 17:55:34;74.20368;10.555546666666666;146.0;12.8;17;143.0;0.0;1;在航
    205003001;2018-09-03 22:29:56;74.76798666666667;9.754333333333333;142.0;12.6;17;141.0;0.0;1;在航
    ...

    获取测试数据后,请上传到OSS目录或服务器,生成可访问的URI,如:https://<bucket_name>.oss-cn-beijing.aliyuncs.com/xxx/data.csv

    然后在IDE中新建Maven工程dla-ganos-quickstart, 并在pom.xml文件中的<dependencies>标签中添加如下依赖:

    <dependency>
         <groupId>com.aliyun.ganos</groupId>
         <artifactId>ganos-spark-sdk</artifactId>
         <version>1.0</version>
         <scope>system</scope>
         <systemPath>获取的DLA-Ganos-SDK包的uri</systemPath>
         </dependency>
    
    <!-- GeoTools deps -->
    <dependency>
         <groupId>org.geotools</groupId>
         <artifactId>gt-referencing</artifactId>
         <version>22.0</version>
    </dependency>
    <dependency>
         <groupId>org.geotools</groupId>
         <artifactId>gt-epsg-hsql</artifactId>
           <version>22.0</version>
    </dependency>
    
    <!-- provided deps -->
    <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.11</artifactId>
         <version>2.4.3</version>
    </dependency>
    <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-sql_2.11</artifactId>
           <version>2.4.3</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-client</artifactId>
         <version>4.3.0</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-framework</artifactId>
         <version>4.3.0</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
         <version>4.3.0</version>
    </dependency>

    至此,开发环境搭建完成。

  3. 数据加载与处理。

    创建QuickStart.scala文件,导入以下程序包:

    import org.apache.log4j.{Level, Logger}
    import com.aliyun.ganos.spark.GanosSparkKryoRegistrator
    import com.aliyun.ganos.spark.jts._
    import com.aliyun.ganos.spark.SparkUtils._
    import com.aliyun.ganos.spark.utils._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
    import org.apache.spark.sql.functions.col
    import org.locationtech.geomesa.utils.io.WithStore
    import org.geotools.util.factory.Hints
    import org.geotools.data.DataStore
    import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

    然后,初始化SparkSession对象,并注册Kryo序列化工具GanosSparkKryoRegistrator

    val spark = SparkSession.builder
        .appName("Simple Application")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.crossJoin.enabled", "true")
        .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
        .getOrCreate()
    import spark.implicits._
    
    //SparkSession加载JTS包用于处理时空数据。
    spark.withJTS
    val sc = spark.sparkContext

    接着,通过spark的csv接口加载CSV文件,并调用st_makePoint函数构建时空对象。

    说明 这里使用了spark contextsc.addFile接口首先将文件下载到集群,然后才能进行访问。
    sc.addFile("https://bucket名称.oss-cn-beijing.aliyuncs.com/xxx/data.csv")
    val result = spark.read.format("csv")
        .option("header", "true")
        .option("delimiter", ";")
        .option("inferSchema", "true")
        .load(org.apache.spark.SparkFiles.get("data.csv"))
    
    //调用st_makePoint函数构建时空对象。
    val ais = result.withColumn("geom", st_makePoint(col("longitude"), col("latitude")))
    ais.show           //打印DataFrame内容。
    ais.printSchema    //打印DataFrame Schema。
    输出结果如下:
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |      sid|               date|        longitude|          latitude|course|speed|sourceid|heading|rot|messaged|status|                geom|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |205003001|2018-09-03 09:15:50|72.96675666666667|11.982751666666667| 130.0| 13.1|      17|  129.0|0.0|        1|  在航|POINT (72.9667566...|
    |205003001|2018-09-03 11:10:17|73.27778666666667|          11.70024| 133.0| 13.1|      17|  131.0|0.0|        1|  在航|POINT (73.2777866...|
    |205003001|2018-09-03 11:12:20|73.28349333333334|11.695146666666666| 133.0| 13.1|      17|  131.0|0.0|        1|  在航|POINT (73.2834933...|
    |205003001|2018-09-03 12:45:25|73.54346666666666|11.469733333333334| 134.0| 13.2|      17|  133.0|0.0|        1|  在航|POINT (73.5434666...|
    |205003001|2018-09-03 14:21:37|         73.75408|            11.182| 144.8| 14.0|      17|  145.0|0.0|        1|  在航|POINT (73.75408 1...|
    |205003001|2018-09-03 14:21:45|73.75434666666666|           11.1816| 144.8| 14.0|      17|  145.0|0.0|        1|  在航|POINT (73.7543466...|
    |205003001|2018-09-03 16:49:06|74.06773333333334|           10.7524| 146.0| 12.8|      17|  143.0|0.0|        1|  在航|POINT (74.0677333...|
    |205003001|2018-09-03 17:55:34|         74.20368|10.555546666666666| 146.0| 12.8|      17|  143.0|0.0|        1|  在航|POINT (74.20368 1...|
    |205003001|2018-09-03 22:29:56|74.76798666666667| 9.754333333333333| 142.0| 12.6|      17|  141.0|0.0|        1|  在航|POINT (74.7679866...|
    |205003001|2018-09-04 00:07:58|74.96557333333334|           9.46616| 145.0| 12.8|      17|  145.0|0.0|        1|  在航|POINT (74.9655733...|
    |205003001|2018-09-04 00:35:11|75.02181333333333|           9.38648| 145.0| 12.9|      17|  145.0|0.0|        1|  在航|POINT (75.0218133...|
    |205003001|2018-09-04 00:41:32|75.03477333333333|           9.36856| 145.2| 14.0|      17|  145.0|0.0|        1|  在航|POINT (75.0347733...|
    |205003001|2018-09-04 00:41:51|75.03549333333333| 9.367653333333333| 145.2| 14.0|      17|  145.0|0.0|        1|  在航|POINT (75.0354933...|
    |205003001|2018-09-04 00:42:02|75.03581333333334| 9.367146666666667| 145.0| 12.8|      17|  145.0|0.0|        1|  在航|POINT (75.0358133...|
    |205003001|2018-09-04 00:49:42|75.05202666666666| 9.344533333333333| 144.0| 12.8|      17|  145.0|0.0|        1|  在航|POINT (75.0520266...|
    |205003001|2018-09-04 02:30:35|75.25965333333333| 9.049493333333333| 146.0| 13.2|      17|  147.0|0.0|        1|  在航|POINT (75.2596533...|
    |205003001|2018-09-04 02:30:45|            75.26| 9.048986666666666| 146.0| 13.2|      17|  147.0|0.0|        1|  在航|POINT (75.26 9.04...|
    |205003001|2018-09-04 10:54:37|76.46938666666667| 7.623226666666667| 141.0| 13.7|      17|  139.0|0.0|        1|  在航|POINT (76.4693866...|
    |205003001|2018-09-04 10:54:59|         76.47024|           7.62216| 141.0| 13.7|      17|  139.0|0.0|        1|  在航|POINT (76.47024 7...|
    |205003001|2018-09-04 10:55:59|76.47261333333333|            7.6192| 142.0| 13.7|      17|  139.0|0.0|        1|  在航|POINT (76.4726133...|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    only showing top 20 rows
    
    root
     |-- sid: integer (nullable = true)
     |-- date: timestamp (nullable = true)
     |-- longitude: double (nullable = true)
     |-- latitude: double (nullable = true)
     |-- course: double (nullable = true)
     |-- speed: double (nullable = true)
     |-- sourceid: integer (nullable = true)
     |-- heading: double (nullable = true)
     |-- rot: double (nullable = true)
     |-- messaged: integer (nullable = true)
     |-- status: string (nullable = true)
     |-- geom: point (nullable = true)

    可以看到输出的Schema中新增了一列geom,类型为point。该输出结果就是通过st_makePoint方法构造出的时空对象。下一步可以对geom列进行各类时空相关的查询分析操作,如使用Spark SQL对geom字段进行时空查询过滤。

    ais.createOrReplaceTempView("ais")
    val query = spark.sql(
        "SELECT * FROM ais WHERE " +
          "st_contains(st_geomfromtext('POLYGON((73.0 8.5,75 8.5,75 13.5,73.0 13.5,73.0 8.5))'), geom) AND " +
          "date>=to_timestamp('2018-09-03') AND date <=to_timestamp('2018-10-05') order by date")
    query.show()
    println(s"count=$query.count()")
    输出结果如下:
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |      sid|               date|        longitude|          latitude|course|speed|sourceid|heading|rot|messaged|status|                geom|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |205003001|2018-09-03 11:10:17|73.27778666666667|          11.70024| 133.0| 13.1|      17|  131.0|0.0|        1|  在航|POINT (73.2777866...|
    |205003001|2018-09-03 11:12:20|73.28349333333334|11.695146666666666| 133.0| 13.1|      17|  131.0|0.0|        1|  在航|POINT (73.2834933...|
    |205003001|2018-09-03 12:45:25|73.54346666666666|11.469733333333334| 134.0| 13.2|      17|  133.0|0.0|        1|  在航|POINT (73.5434666...|
    |205003001|2018-09-03 14:21:37|         73.75408|            11.182| 144.8| 14.0|      17|  145.0|0.0|        1|  在航|POINT (73.75408 1...|
    |205003001|2018-09-03 14:21:45|73.75434666666666|           11.1816| 144.8| 14.0|      17|  145.0|0.0|        1|  在航|POINT (73.7543466...|
    |205003001|2018-09-03 16:49:06|74.06773333333334|           10.7524| 146.0| 12.8|      17|  143.0|0.0|        1|  在航|POINT (74.0677333...|
    |205003001|2018-09-03 17:55:34|         74.20368|10.555546666666666| 146.0| 12.8|      17|  143.0|0.0|        1|  在航|POINT (74.20368 1...|
    |205003001|2018-09-03 22:29:56|74.76798666666667| 9.754333333333333| 142.0| 12.6|      17|  141.0|0.0|        1|  在航|POINT (74.7679866...|
    |205003001|2018-09-04 00:07:58|74.96557333333334|           9.46616| 145.0| 12.8|      17|  145.0|0.0|        1|  在航|POINT (74.9655733...|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    
    count=9

    可以看到通过st_contains时空过滤,获取到9条符合条件的记录。

  4. 登录Data Lake Analytics管理控制台提交Spark作业。具体请参见创建和执行Spark作业
    编译工程并打包:
    mvn clean package

更多操作

您还可以对时空数据对象进行如下操作: