盲区会车

在导航等场景中,由于某些路口弯道过大或存在盲区,无法看到对面车道的车辆情况,存在事故风险。为了更好地规避交通事故,您可以使用Lindorm宽表引擎、Lindorm流引擎(实时计算),结合Apache Kafka,预先设置好盲区(地理围栏),在检测到其他车辆时及时向司机发出提醒。

场景描述

在盲区会车场景中:

  • 可以将盲区、弯道存储为地理围栏(使用Polygon类型的数据),地理围栏数据一般不会频繁更改。

  • 车辆位置信息是实时上传的,通常会按照指定的时间间隔更新当前位置信息,这类数据变化较频繁。

示例:

当编号为car10的车辆进入指定的盲区时,如果此时对面有其他车辆驶入(比如car11),则会向car10的司机发出提醒。

image.png

技术实现

盲区会车涉及到地理围栏数据(盲区)、车辆实时位置数据以及最终的计算结果数据。

  • 地理围栏数据变化频率不大,可将其存储于Lindorm宽表引擎中。在计算过程中,流引擎会从宽表引擎中读取这些数据。

  • 车辆位置信息会实时上传,采用Apache Kafka来收集车辆上传的信息,并将其接入Lindorm流引擎参与计算。

  • 最终的计算结果存储到Lindorm宽表引擎中。

lQLPKGV817OklFvNAQbNA2CwoB8T8EnteI8FWrRiMzGIAA_864_262.png

前提条件

  • 已创建Lindorm实例并开通宽表引擎和流引擎,详情请参见创建实例

  • 已开通Lindorm Ganos服务,详情请参见开通时空服务(免费)

  • 已创建操作系统为Linux的ECS实例,详情请参见创建ECS实例

    说明
    • ECS实例需要与Lindorm实例所在地域相同,且属于同一专有网络。

    • ECS的操作系统镜像建议使用Alibaba Cloud Linux。

步骤一:创建地理围栏表和结果表

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 在宽表引擎中,创建地理围栏表并插入示例数据。

    1. 创建地理围栏表area

      CREATE TABLE area(id INT, name VARCHAR, poly GEOMETRY, PRIMARY KEY(id));
      说明

      地理围栏表中的poly列用于存储地理围栏数据。

    2. 插入示例数据。

      INSERT INTO area(id,name,poly) VALUES
        (1,'cross1',ST_GeomFromText('POLYGON ((116.4234 39.9147, 116.4234 39.9337, 116.4329 39.9337, 116.4329 39.9147, 116.4234 39.9147))')),
        (2,'cross2',ST_GeomFromText('POLYGON ((116.3979 39.9035, 116.3978 39.8938, 116.4184 39.8938, 116.4184 39.9035, 116.3979 39.9035))'));
  3. 创建结果表。

    针对盲区会车场景,创建结果表warns

    CREATE TABLE warns(cid INT, ts TIMESTAMP, alert BOOLEAN, PRIMARY KEY (cid,ts));
    说明

    结果表中,cid表示车辆编号;ts表示收到通知的时间;alter表示通知结果,取值为true时表示前方对面车道有车辆。

步骤二:接入流数据

本示例将通过开源Kafka脚本工具连接Lindorm流引擎。

  1. 连接Lindorm流引擎,并创建名为logVehicle的Topic。详情请参见通过开源Kafka脚本工具连接Lindorm流引擎

  2. 执行如下命令,将示例数据写入到已创建的Topic中。

    ./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle < testcar.txt
    说明

    testcar.txt为本文提供的示例数据,您需要将其下载并上传到开源Kafka脚本工具的根目录下。

    您可以使用./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle --from-beginning命令,查看数据是否成功写入。

步骤三:提交流引擎计算任务

使用Flink SQL提交Lindorm流引擎计算任务,读取开源Kafka topic中的数据并结合宽表数据做计算。

  1. 连接到Lindorm流引擎,详情请参见步骤二:安装流引擎客户端

  2. 创建盲区会车计算任务。

    在如下的示例任务中,判断编号为10的车辆进入指定的盲区时,盲区中是否有其他车辆。任务流程如下:

    1. 加载 ganos 函数模块。

    2. 创建数据源表 drive、数据维表area、结果表warns

    3. 创建流任务,使用count函数及geomHint函数过滤数据,判断编号为10的车辆进入指定的盲区时,盲区中是否有其他车辆。

    重要

    请将示例程序中的参数值替换为实际的值,您可以在Lindorm实例的数据库连接中查看参数值,详情请参见查看连接地址

    • seedServer:Lindorm宽表引擎SQL地址的专有网络地址,并将端口改为30020。格式为ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020

    • password:Lindorm宽表引擎的默认密码。

    • properties.bootstrap.servers:Kafka的连接地址,请使用流引擎的Lindorm Stream Kafka专有网络地址。格式为ld-****-proxy-stream.lindorm.rds.aliyuncs.com:30080

    示例程序:

    CREATE FJOB warnCar1(
      LOAD MODULE ganos;
      -- create danger area table
      CREATE TABLE area(
        id INT,
        name VARCHAR,
        poly GEOMETRY,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector'='lindorm',
        'seedServer'='ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020',
        'userName'='root',
        'password'='your_password',
        'tableName'='area',
        'namespace'='default'
        );
      -- create stream table
      CREATE TABLE drive(
        cid INT,
        ts TIMESTAMP(0),
        lng DOUBLE,
        lat DOUBLE,
        pt AS proctime(),
        WATERMARK FOR ts AS ts
      ) WITH (
        'connector'='kafka',
        'topic'='logVehicle',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='ld-****-proxy-stream.lindorm.rds.aliyuncs.com:30080',
        'format'='json'
      );
      -- create result table
      CREATE TABLE warns(
        cid INT,
        ts TIMESTAMP(0),
        alert BOOLEAN,
        PRIMARY KEY(cid,ts) NOT ENFORCED
      ) WITH (
        'connector'='lindorm',
        'seedServer'='ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020',
        'userName'='root',
        'password'='your_password',
        'tableName'='warns',
        'namespace'='default'
      );
      -- warns car 10 if there's other car enter the danger zone 
      INSERT INTO warns 
        SELECT 10 AS cid, A.ts AS ts, 
        IF(count(DISTINCT A.cid)>=1,TRUE,FALSE) AS alert 
        FROM drive AS A 
        JOIN area /*+ OPTIONS('geomHint'='poly:st_contains','geomIndex'='true','cacheTTLMs'='180000') */ 
        FOR SYSTEM_TIME AS OF A.pt AS B 
        ON B.poly=ST_MakePoint(A.lng, A.lat) 
        WHERE A.cid<>10 AND A.lng IS NOT NULL AND A.lat IS NOT NULL GROUP BY A.ts;
    );

步骤四:查看结果

在Lindorm宽表中,通过SQL语句查看对应的结果。

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 查询结果表warns表中的数据。

    SELECT * FROM warns;

    返回结果如下,编号为10的车辆分别在下列时间点收到了会车警告。

    +-----+-------------------------------+-------+
    | cid |              ts               | alert |
    +-----+-------------------------------+-------+
    | 10  | 2008-02-02 21:33:23 +0000 UTC | true  |
    | 10  | 2008-02-02 21:33:33 +0000 UTC | true  |
    | 10  | 2008-02-02 21:39:53 +0000 UTC | true  |
    | 10  | 2008-02-02 21:41:03 +0000 UTC | true  |
    | 10  | 2008-02-02 21:41:13 +0000 UTC | true  |
    | 10  | 2008-02-02 21:41:23 +0000 UTC | true  |
    | 10  | 2008-02-02 21:43:43 +0000 UTC | true  |
    +-----+-------------------------------+-------+