在导航等场景中,由于某些路口弯道过大或存在盲区,无法看到对面车道的车辆情况,存在事故风险。为了更好地规避交通事故,您可以使用Lindorm宽表引擎、Lindorm流引擎(实时计算),结合Apache Kafka,预先设置好盲区(地理围栏),在检测到其他车辆时及时向司机发出提醒。
场景描述
在盲区会车场景中:
可以将盲区、弯道存储为地理围栏(使用Polygon类型的数据),地理围栏数据一般不会频繁更改。
车辆位置信息是实时上传的,通常会按照指定的时间间隔更新当前位置信息,这类数据变化较频繁。
示例:
当编号为car10的车辆进入指定的盲区时,如果此时对面有其他车辆驶入(比如car11),则会向car10的司机发出提醒。
技术实现
盲区会车涉及到地理围栏数据(盲区)、车辆实时位置数据以及最终的计算结果数据。
地理围栏数据变化频率不大,可将其存储于Lindorm宽表引擎中。在计算过程中,流引擎会从宽表引擎中读取这些数据。
车辆位置信息会实时上传,采用Apache Kafka来收集车辆上传的信息,并将其接入Lindorm流引擎参与计算。
最终的计算结果存储到Lindorm宽表引擎中。
前提条件
已创建Lindorm实例并开通宽表引擎和流引擎,详情请参见创建实例。
已开通Lindorm Ganos服务,详情请参见开通时空服务(免费)。
已创建操作系统为Linux的ECS实例,详情请参见创建ECS实例。
说明ECS实例需要与Lindorm实例所在地域相同,且属于同一专有网络。
ECS的操作系统镜像建议使用Alibaba Cloud Linux。
步骤一:创建地理围栏表和结果表
在宽表引擎中,创建地理围栏表并插入示例数据。
创建地理围栏表
area
。CREATE TABLE area(id INT, name VARCHAR, poly GEOMETRY, PRIMARY KEY(id));
说明地理围栏表中的
poly
列用于存储地理围栏数据。插入示例数据。
INSERT INTO area(id,name,poly) VALUES (1,'cross1',ST_GeomFromText('POLYGON ((116.4234 39.9147, 116.4234 39.9337, 116.4329 39.9337, 116.4329 39.9147, 116.4234 39.9147))')), (2,'cross2',ST_GeomFromText('POLYGON ((116.3979 39.9035, 116.3978 39.8938, 116.4184 39.8938, 116.4184 39.9035, 116.3979 39.9035))'));
创建结果表。
针对盲区会车场景,创建结果表
warns
:CREATE TABLE warns(cid INT, ts TIMESTAMP, alert BOOLEAN, PRIMARY KEY (cid,ts));
说明结果表中,
cid
表示车辆编号;ts
表示收到通知的时间;alter
表示通知结果,取值为true时表示前方对面车道有车辆。
步骤二:接入流数据
本示例将通过开源Kafka脚本工具连接Lindorm流引擎。
连接Lindorm流引擎,并创建名为
logVehicle
的Topic。详情请参见通过开源Kafka脚本工具连接Lindorm流引擎。执行如下命令,将示例数据写入到已创建的Topic中。
./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle < testcar.txt
说明testcar.txt
为本文提供的示例数据,您需要将其下载并上传到开源Kafka脚本工具的根目录下。您可以使用
./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle --from-beginning
命令,查看数据是否成功写入。
步骤三:提交流引擎计算任务
使用Flink SQL提交Lindorm流引擎计算任务,读取开源Kafka topic中的数据并结合宽表数据做计算。
连接到Lindorm流引擎,详情请参见使用流引擎。
创建盲区会车计算任务。
在如下的示例任务中,判断编号为10的车辆进入指定的盲区时,盲区中是否有其他车辆。任务流程如下:
加载
ganos
函数模块。创建数据源表
drive
、数据维表area
、结果表warns
。创建流任务,使用
count
函数及geomHint
函数过滤数据,判断编号为10的车辆进入指定的盲区时,盲区中是否有其他车辆。
重要请将示例程序中的参数值替换为实际的值,您可以在Lindorm实例的数据库连接中查看参数值,详情请参见查看连接地址。
seedServer
:Lindorm宽表引擎SQL地址的专有网络地址,并将端口改为30020。格式为ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020
。password
:Lindorm宽表引擎的默认密码。properties.bootstrap.servers
:Kafka的连接地址,请使用流引擎的Lindorm Stream Kafka专有网络地址。格式为ld-****-proxy-stream.lindorm.rds.aliyuncs.com:30080
。
示例程序:
CREATE FJOB warnCar1( LOAD MODULE ganos; -- create danger area table CREATE TABLE area( id INT, name VARCHAR, poly GEOMETRY, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020', 'userName'='root', 'password'='your_password', 'tableName'='area', 'namespace'='default' ); -- create stream table CREATE TABLE drive( cid INT, ts TIMESTAMP(0), lng DOUBLE, lat DOUBLE, pt AS proctime(), WATERMARK FOR ts AS ts ) WITH ( 'connector'='kafka', 'topic'='logVehicle', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='ld-****-proxy-stream.lindorm.rds.aliyuncs.com:30080', 'format'='json' ); -- create result table CREATE TABLE warns( cid INT, ts TIMESTAMP(0), alert BOOLEAN, PRIMARY KEY(cid,ts) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020', 'userName'='root', 'password'='your_password', 'tableName'='warns', 'namespace'='default' ); -- warns car 10 if there's other car enter the danger zone INSERT INTO warns SELECT 10 AS cid, A.ts AS ts, IF(count(DISTINCT A.cid)>=1,TRUE,FALSE) AS alert FROM drive AS A JOIN area /*+ OPTIONS('geomHint'='poly:st_contains','geomIndex'='true','cacheTTLMs'='180000') */ FOR SYSTEM_TIME AS OF A.pt AS B ON B.poly=ST_MakePoint(A.lng, A.lat) WHERE A.cid<>10 AND A.lng IS NOT NULL AND A.lat IS NOT NULL GROUP BY A.ts; );
步骤四:查看结果
在Lindorm宽表中,通过SQL语句查看对应的结果。
查询结果表
warns
表中的数据。SELECT * FROM warns;
返回结果如下,编号为10的车辆分别在下列时间点收到了会车警告。
+-----+-------------------------------+-------+ | cid | ts | alert | +-----+-------------------------------+-------+ | 10 | 2008-02-02 21:33:23 +0000 UTC | true | | 10 | 2008-02-02 21:33:33 +0000 UTC | true | | 10 | 2008-02-02 21:39:53 +0000 UTC | true | | 10 | 2008-02-02 21:41:03 +0000 UTC | true | | 10 | 2008-02-02 21:41:13 +0000 UTC | true | | 10 | 2008-02-02 21:41:23 +0000 UTC | true | | 10 | 2008-02-02 21:43:43 +0000 UTC | true | +-----+-------------------------------+-------+