本文主要介绍在DLA中如何使用时序时空引擎Ganos进行时空几何(Geometry)数据分析。
- 时空几何对象
- 矢量数据,如点、线、面状要素。
- 在矢量数据基础上结合时间属性,组成的时空数据或时空轨迹数据。
- 针对时空几何对象的相关操作,如空间关系判断(相交、相邻、包含等)。
DLA Ganos中矢量数据模型采用OGC协议标准中的SimpleFeature数据模型,其中的Geometry对象包含Point、LineString、Polygon、MultiPoint、MultiLineString和MultiPolygon等要素。Geometry以GeometryUDT的形式被Spark加载并参与运算。
- Hive
- PolarDB
- Lindorm(HBase)
- GeoMesa
- PostGIS
操作步骤
- 获取SDK。
- 搭建开发环境,并准备测试数据。
获取SDK后,就可以开始部署开发环境并构建第一个Ganos Spark任务。首先,您需要准备测船载自动识别系统(AIS)样例数据。
sid;date;longitude;latitude;course;speed;sourceid;heading;rot;messaged;status 205003001;2018-09-03 09:15:50;72.96675666666667;11.982751666666667;130.0;13.1;17;129.0;0.0;1;在航 205003001;2018-09-03 11:10:17;73.27778666666667;11.70024;133.0;13.1;17;131.0;0.0;1;在航 205003001;2018-09-03 11:12:20;73.28349333333334;11.695146666666666;133.0;13.1;17;131.0;0.0;1;在航 205003001;2018-09-03 12:45:25;73.54346666666666;11.469733333333334;134.0;13.2;17;133.0;0.0;1;在航 205003001;2018-09-03 14:21:37;73.75408;11.182;144.8;14.0;17;145.0;0.0;1;在航 205003001;2018-09-03 14:21:45;73.75434666666666;11.1816;144.8;14.0;17;145.0;0.0;1;在航 205003001;2018-09-03 16:49:06;74.06773333333334;10.7524;146.0;12.8;17;143.0;0.0;1;在航 205003001;2018-09-03 17:55:34;74.20368;10.555546666666666;146.0;12.8;17;143.0;0.0;1;在航 205003001;2018-09-03 22:29:56;74.76798666666667;9.754333333333333;142.0;12.6;17;141.0;0.0;1;在航 ...
获取测试数据后,请上传到OSS目录或服务器,生成可访问的URI,如:
https://<bucket_name>.oss-cn-beijing.aliyuncs.com/xxx/data.csv
。然后在IDE中新建Maven工程
dla-ganos-quickstart
, 并在pom.xml文件中的<dependencies>
标签中添加如下依赖:<dependency> <groupId>com.aliyun.ganos</groupId> <artifactId>ganos-spark-sdk</artifactId> <version>1.0</version> <scope>system</scope> <systemPath>获取的DLA-Ganos-SDK包的uri</systemPath> </dependency> <!-- GeoTools deps --> <dependency> <groupId>org.geotools</groupId> <artifactId>gt-referencing</artifactId> <version>22.0</version> </dependency> <dependency> <groupId>org.geotools</groupId> <artifactId>gt-epsg-hsql</artifactId> <version>22.0</version> </dependency> <!-- provided deps --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency>
至此,开发环境搭建完成。
- 数据加载与处理。
创建QuickStart.scala文件,导入以下程序包:
import org.apache.log4j.{Level, Logger} import com.aliyun.ganos.spark.GanosSparkKryoRegistrator import com.aliyun.ganos.spark.jts._ import com.aliyun.ganos.spark.SparkUtils._ import com.aliyun.ganos.spark.utils._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.spark.sql.functions.col import org.locationtech.geomesa.utils.io.WithStore import org.geotools.util.factory.Hints import org.geotools.data.DataStore import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
然后,初始化
SparkSession
对象,并注册Kryo序列化工具GanosSparkKryoRegistrator
:val spark = SparkSession.builder .appName("Simple Application") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.crossJoin.enabled", "true") .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName) .getOrCreate() import spark.implicits._ //SparkSession加载JTS包用于处理时空数据。 spark.withJTS val sc = spark.sparkContext
接着,通过spark的csv接口加载CSV文件,并调用
st_makePoint
函数构建时空对象。说明 这里使用了spark context
的sc.addFile
接口首先将文件下载到集群,然后才能进行访问。sc.addFile("https://bucket名称.oss-cn-beijing.aliyuncs.com/xxx/data.csv") val result = spark.read.format("csv") .option("header", "true") .option("delimiter", ";") .option("inferSchema", "true") .load(org.apache.spark.SparkFiles.get("data.csv")) //调用st_makePoint函数构建时空对象。 val ais = result.withColumn("geom", st_makePoint(col("longitude"), col("latitude"))) ais.show //打印DataFrame内容。 ais.printSchema //打印DataFrame Schema。
输出结果如下:+---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+ | sid| date| longitude| latitude|course|speed|sourceid|heading|rot|messaged|status| geom| +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+ |205003001|2018-09-03 09:15:50|72.96675666666667|11.982751666666667| 130.0| 13.1| 17| 129.0|0.0| 1| 在航|POINT (72.9667566...| |205003001|2018-09-03 11:10:17|73.27778666666667| 11.70024| 133.0| 13.1| 17| 131.0|0.0| 1| 在航|POINT (73.2777866...| |205003001|2018-09-03 11:12:20|73.28349333333334|11.695146666666666| 133.0| 13.1| 17| 131.0|0.0| 1| 在航|POINT (73.2834933...| |205003001|2018-09-03 12:45:25|73.54346666666666|11.469733333333334| 134.0| 13.2| 17| 133.0|0.0| 1| 在航|POINT (73.5434666...| |205003001|2018-09-03 14:21:37| 73.75408| 11.182| 144.8| 14.0| 17| 145.0|0.0| 1| 在航|POINT (73.75408 1...| |205003001|2018-09-03 14:21:45|73.75434666666666| 11.1816| 144.8| 14.0| 17| 145.0|0.0| 1| 在航|POINT (73.7543466...| |205003001|2018-09-03 16:49:06|74.06773333333334| 10.7524| 146.0| 12.8| 17| 143.0|0.0| 1| 在航|POINT (74.0677333...| |205003001|2018-09-03 17:55:34| 74.20368|10.555546666666666| 146.0| 12.8| 17| 143.0|0.0| 1| 在航|POINT (74.20368 1...| |205003001|2018-09-03 22:29:56|74.76798666666667| 9.754333333333333| 142.0| 12.6| 17| 141.0|0.0| 1| 在航|POINT (74.7679866...| |205003001|2018-09-04 00:07:58|74.96557333333334| 9.46616| 145.0| 12.8| 17| 145.0|0.0| 1| 在航|POINT (74.9655733...| |205003001|2018-09-04 00:35:11|75.02181333333333| 9.38648| 145.0| 12.9| 17| 145.0|0.0| 1| 在航|POINT (75.0218133...| |205003001|2018-09-04 00:41:32|75.03477333333333| 9.36856| 145.2| 14.0| 17| 145.0|0.0| 1| 在航|POINT (75.0347733...| |205003001|2018-09-04 00:41:51|75.03549333333333| 9.367653333333333| 145.2| 14.0| 17| 145.0|0.0| 1| 在航|POINT (75.0354933...| |205003001|2018-09-04 00:42:02|75.03581333333334| 9.367146666666667| 145.0| 12.8| 17| 145.0|0.0| 1| 在航|POINT (75.0358133...| |205003001|2018-09-04 00:49:42|75.05202666666666| 9.344533333333333| 144.0| 12.8| 17| 145.0|0.0| 1| 在航|POINT (75.0520266...| |205003001|2018-09-04 02:30:35|75.25965333333333| 9.049493333333333| 146.0| 13.2| 17| 147.0|0.0| 1| 在航|POINT (75.2596533...| |205003001|2018-09-04 02:30:45| 75.26| 9.048986666666666| 146.0| 13.2| 17| 147.0|0.0| 1| 在航|POINT (75.26 9.04...| |205003001|2018-09-04 10:54:37|76.46938666666667| 7.623226666666667| 141.0| 13.7| 17| 139.0|0.0| 1| 在航|POINT (76.4693866...| |205003001|2018-09-04 10:54:59| 76.47024| 7.62216| 141.0| 13.7| 17| 139.0|0.0| 1| 在航|POINT (76.47024 7...| |205003001|2018-09-04 10:55:59|76.47261333333333| 7.6192| 142.0| 13.7| 17| 139.0|0.0| 1| 在航|POINT (76.4726133...| +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+ only showing top 20 rows root |-- sid: integer (nullable = true) |-- date: timestamp (nullable = true) |-- longitude: double (nullable = true) |-- latitude: double (nullable = true) |-- course: double (nullable = true) |-- speed: double (nullable = true) |-- sourceid: integer (nullable = true) |-- heading: double (nullable = true) |-- rot: double (nullable = true) |-- messaged: integer (nullable = true) |-- status: string (nullable = true) |-- geom: point (nullable = true)
可以看到输出的Schema中新增了一列
geom
,类型为point
。该输出结果就是通过st_makePoint
方法构造出的时空对象。下一步可以对geom
列进行各类时空相关的查询分析操作,如使用Spark SQL对geom
字段进行时空查询过滤。ais.createOrReplaceTempView("ais") val query = spark.sql( "SELECT * FROM ais WHERE " + "st_contains(st_geomfromtext('POLYGON((73.0 8.5,75 8.5,75 13.5,73.0 13.5,73.0 8.5))'), geom) AND " + "date>=to_timestamp('2018-09-03') AND date <=to_timestamp('2018-10-05') order by date") query.show() println(s"count=$query.count()")
输出结果如下:+---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+ | sid| date| longitude| latitude|course|speed|sourceid|heading|rot|messaged|status| geom| +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+ |205003001|2018-09-03 11:10:17|73.27778666666667| 11.70024| 133.0| 13.1| 17| 131.0|0.0| 1| 在航|POINT (73.2777866...| |205003001|2018-09-03 11:12:20|73.28349333333334|11.695146666666666| 133.0| 13.1| 17| 131.0|0.0| 1| 在航|POINT (73.2834933...| |205003001|2018-09-03 12:45:25|73.54346666666666|11.469733333333334| 134.0| 13.2| 17| 133.0|0.0| 1| 在航|POINT (73.5434666...| |205003001|2018-09-03 14:21:37| 73.75408| 11.182| 144.8| 14.0| 17| 145.0|0.0| 1| 在航|POINT (73.75408 1...| |205003001|2018-09-03 14:21:45|73.75434666666666| 11.1816| 144.8| 14.0| 17| 145.0|0.0| 1| 在航|POINT (73.7543466...| |205003001|2018-09-03 16:49:06|74.06773333333334| 10.7524| 146.0| 12.8| 17| 143.0|0.0| 1| 在航|POINT (74.0677333...| |205003001|2018-09-03 17:55:34| 74.20368|10.555546666666666| 146.0| 12.8| 17| 143.0|0.0| 1| 在航|POINT (74.20368 1...| |205003001|2018-09-03 22:29:56|74.76798666666667| 9.754333333333333| 142.0| 12.6| 17| 141.0|0.0| 1| 在航|POINT (74.7679866...| |205003001|2018-09-04 00:07:58|74.96557333333334| 9.46616| 145.0| 12.8| 17| 145.0|0.0| 1| 在航|POINT (74.9655733...| +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+ count=9
可以看到通过
st_contains
时空过滤,获取到9条符合条件的记录。 - 登录Data Lake Analytics管理控制台提交Spark作业。具体请参见创建和执行Spark作业。编译工程并打包:
mvn clean package
更多操作
- 更多时空函数介绍与使用方法请参考时空几何函数参考。
- 数据入库到Lindorm(HBase)并创建时空索引请参考GeoMesa(HBase/Cassandra)。