文档

轨迹生成

更新时间:

在车联网场景中,为了方便分析和预测,车辆的实时位置点会被聚合为一条轨迹线,每产生一个新的位置点都会被追加至轨迹线中。因为位置点是实时上传的且数据量较大,所以数据库在处理追加数据的操作时通常会消耗大量的IO。为解决这一问题,Lindorm Ganos时空服务结合Lindorm流引擎的实时计算能力,提供了轨迹生成方案。本文介绍轨迹生成的技术实现和实际操作步骤。

背景信息

在车联网场景中,车辆会持续地上传位置点(GPS Point)数据,这些位置点构成了一条轨迹线(Trajectory/LineString)。基于轨迹线,可以开展多种高效的分析及预测工作。例如回溯车辆的行驶轨迹、基于多条轨迹统计常行驶路线、计算车辆轨迹之间的相似性等等。

以下是在车联网场景中的一个示例:每小时将两辆车的位置点拼接为轨迹。

image.png

技术实现

位置点聚合成轨迹线可以看作是追加数据的过程,每次上传一个新的轨迹点都要追加到已有的轨迹线中。

这一过程在数据库层面表现为频繁的读写聚合操作:先读取数据库中已有的轨迹线数据,再在内存中将新的轨迹点数据和已有的轨迹线聚合为新的轨迹线,最后将新的轨迹线写入至数据库中。这个过程非常消耗IO,如果数据量过大,则可能会影响查询响应速度。

为解决这一问题,Lindorm Ganos时空服务基于流引擎提供了轨迹生成方案。Lindorm流引擎读取车辆的实时位置点数据,定期(每小时、每天)将位置点拼接为轨迹,再将聚合后的轨迹线数据写入至数据库,不仅保证了对车辆行驶轨迹数据的实时处理和分析能力,同时也减轻了数据库在处理高频率追加写入操作时的IO压力。

轨迹生成共涉及一种源数据:车辆的实时位置数据。

车辆位置信息是实时上传的流数据,可以实时保存在Kafka Topic中。Lindorm流引擎将读取Kafka Topic中的位置数据,根据计算任务进行实时计算,并将计算结果保存在Lindorm宽表中。

前提条件

注意事项

如果应用部署在ECS实例,通过专有网络访问Lindorm实例前,需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。

  • ECS实例与Lindorm实例属于同一专有网络。

步骤一:创建结果表

宽表引擎中创建结果表,用于保存和查询计算结果。

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 创建结果表resultAgg1,包含五个字段:cid(车辆ID)、stt(窗口开始时间)、edd(窗口结束时间)、line(轨迹线)、timeseries(时间戳)。

    CREATE TABLE resultAgg1(cid INT, stt TIMESTAMP, edd TIMESTAMP, line VARCHAR,  timeseries VARCHAR, PRIMARY KEY (cid,stt));

步骤二:写入流数据

Lindorm流引擎完全兼容开源Kafka API,您可以通过Kafka开源客户端或脚本工具连接Lindorm流引擎并写入测试数据。

以通过开源Kafka脚本工具写入数据为例。

  1. 下载并安装Kafka脚本工具。具体操作,请参见通过开源Kafka脚本工具连接Lindorm流引擎

  2. 执行以下命令,创建名为tdrive的Kafka Topic。

    bin/kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic tdrive --create

    其中,Lindorm Stream Kafka地址为流引擎的Kafka连接地址,仅支持通过专有网络访问。获取方式,请参见查看连接地址

  3. 执行以下命令,将示例数据car1.txt(测试用位置点数据)写入Kafka Topic中。示例数据获取:car1.txt

    ./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic tdrive < car1.txt

步骤三:提交流引擎计算任务

使用Flink SQL提交Lindorm流引擎计算任务,读取Kafka Topic中的数据进行计算。

  1. 连接Lindorm流引擎。如何连接,请参见使用流引擎

  2. 提交计算任务。

    计算任务将写入Kafka Topic的点根据指定的时间窗口和顺序拼接为轨迹,具体步骤如下:

    1. 加载ganos函数模块。

    2. 在Flink Job中创建两张表:数据源表tdrive和数据结果表resultAgg1,通过设置连接器参数,分别关联已创建的Kafka Topic和结果表resultAgg1。

    3. 创建流任务,使用ST_MakeLine_Ts_Agg函数将点拼接为轨迹,并按照输入的时间戳(Timestamp)列对数据进行排序,使用LISTAGG函数拼接时间戳,最后将计算结果写入结果表resultAgg1。

    CREATE FJOB lineAgg1 (
      LOAD MODULE ganos;
      -- create stream table
      CREATE TABLE tdrive(
        cid INT,
        ts TIMESTAMP(0),
        lng DOUBLE,
        lat DOUBLE,
        WATERMARK FOR ts AS ts-INTERVAL '10' MINUTES
      ) WITH (
        'connector'='kafka',
        'topic'='tdrive',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='<Lindorm Stream Kafka地址>',
        'format'='json',
        'json.ignore-parse-errors'='true'
      );
      --create result table
      CREATE TABLE resultAgg1(
        cid INT,
        stt TIMESTAMP(0),
        edd TIMESTAMP(0),
        line STRING,
        PRIMARY KEY(cid,stt) NOT ENFORCED
      ) WITH (
        'connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='<Lindorm宽表引擎的用户名>',
        'password'='<Lindorm宽表引擎的密码>',
        'tableName'='resultAgg1',
        'namespace'='<结果表resultAgg1所在的数据库>');
      INSERT INTO resultAgg1 
        SELECT 
        cid, 
        window_start AS stt,
        window_end AS edd, 
        ST_AsText(ST_MakeLine_Ts_Agg(ST_MakePoint(lng,lat),ts)) AS line,
        LISTAGG(CAST(ts AS VARCHAR)) AS timeseries 
        FROM TABLE(TUMBLE(TABLE tdrive, DESCRIPTOR(ts), INTERVAL '1' HOUR))
        WHERE lng IS NOT NULL AND lat IS NOT NULL 
        AND ST_IsEmpty(ST_MakePoint(lng,lat))=false
        GROUP BY window_start,window_end,cid;
    );
    说明

步骤四:查看结果

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 执行以下语句,查看区域统计结果。

    SELECT * FROM resultAgg1 LIMIT 5;

    返回结果:

    +-----+-------------------------------+-------------------------------+--------------------------------+--------------------------------+
    | cid |              stt              |              edd              |              line              |           timeseries           |
    +-----+-------------------------------+-------------------------------+--------------------------------+--------------------------------+
    | 1   | 2008-02-02 15:00:00 +0000 UTC | 2008-02-02 16:00:00 +0000 UTC | LINESTRING (116.51172          | 2008-02-02                     |
    |     |                               |                               | 39.92123, 116.51135 39.93883,  | 07:36:08.000,2008-02-02        |
    |     |                               |                               | 116.51627 39.91034)            | 07:46:08.000,2008-02-02        |
    |     |                               |                               |                                | 07:56:08.000                   |
    | 1   | 2008-02-02 16:00:00 +0000 UTC | 2008-02-02 17:00:00 +0000 UTC | LINESTRING (116.47186          | 2008-02-02                     |
    |     |                               |                               | 39.91248, 116.47217 39.92498,  | 08:06:08.000,2008-02-02        |
    |     |                               |                               | 116.47179 39.90718, 116.45617  | 08:16:08.000,2008-02-02        |
    |     |                               |                               | 39.90531)                      | 08:26:08.000,2008-02-02        |
    |     |                               |                               |                                | 08:36:08.000                   |
    | 1   | 2008-02-02 17:00:00 +0000 UTC | 2008-02-02 18:00:00 +0000 UTC | LINESTRING (116.47191          | 2008-02-02                     |
    |     |                               |                               | 39.90577, 116.50661 39.9145)   | 09:00:24.000,2008-02-02        |
    |     |                               |                               |                                | 09:10:24.000                   |
    | 1   | 2008-02-02 20:00:00 +0000 UTC | 2008-02-02 21:00:00 +0000 UTC | LINESTRING (116.49625 39.9146, | 2008-02-02                     |
    |     |                               |                               | 116.50962 39.91071, 116.52231  | 12:30:34.000,2008-02-02        |
    |     |                               |                               | 39.91588)                      | 12:40:33.000,2008-02-02        |
    |     |                               |                               |                                | 12:50:33.000                   |
    | 1   | 2008-02-02 21:00:00 +0000 UTC | 2008-02-02 22:00:00 +0000 UTC | LINESTRING (116.56444          | 2008-02-02                     |
    |     |                               |                               | 39.91445, 116.59512 39.90798,  | 13:00:33.000,2008-02-02        |
    |     |                               |                               | 116.65522 39.8622, 116.69164   | 13:10:33.000,2008-02-02        |
    |     |                               |                               | 39.85165, 116.69167 39.85166)  | 13:30:33.000,2008-02-02        |
    |     |                               |                               |                                | 13:40:33.000,2008-02-02        |
    |     |                               |                               |                                | 13:50:33.000                   |
    +-----+-------------------------------+-------------------------------+--------------------------------+--------------------------------+

    其中line列为聚合得到的轨迹线,timeseries为位置点对应的时间戳。