您可以基于Lindorm Ganos时空服务与Lindorm流引擎轻松实现电子围栏方案。
背景信息
在车辆管理场景中,车辆电子围栏可以用于监控车辆的行驶范围、判断车辆是否偏离预定路线等,帮助管理人员更好地进行车辆调度和管理。
通常管理人员会预先设置好若干个区域,简称地理围栏,由于该区域不是实时变化的,可以使用外表的形式存储到Lindorm宽表中。而车辆的实时数据(坐标信息)是实时上传的,可以将车辆的实时数据存储到Kafka中,再由Lindorm流引擎通过订阅Kafka中的实时数据,实时计算多个车辆位置和地理围栏之间的关系。
本示例提供的Lindorm流引擎电子围栏查询支持内存索引和并行计算加速,大大提高了计算效率。
前提条件
开通Lindorm Ganos时空服务,更多信息请参见开通时空服务。
开通Lindorm流引擎,更多信息请参见开通流引擎。
已创建ECS实例,用于读写数据以及创建计算任务等,该实例需要与Lindorm实例为同一VPC,并已安装JDK 1.8及以上环境,更多信息请参见快速购买包年包月实例。
已将上述ECS实例的IP地址添加到Lindorm实例的白名单中,用于连接Lindorm实例,更多信息请参见设置白名单。
操作步骤
使用lindorm-cli创建地理围栏表regions和结果表fresult,更多信息请参见通过Lindorm-cli连接宽表引擎。
regions建表语句:
CREATE TABLE regions(rID INT, rName VARCHAR, fence GEOMETRY, PRIMARY KEY(rID));
插入地理围栏数据:
INSERT INTO regions(rID, rName, fence) VALUES (1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')), (2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')), (3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));
fresult建表语句:
CREATE TABLE fresult(uID VARCHAR, rName VARCHAR, rID INT, PRIMARY KEY (uID));
将实时数据写入流引擎中。
本示例将通过开源Kafka脚本工具连接Lindorm流引擎。
在ECS实例中,执行以下命令,下载、解压开源Kafka脚本工具。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/kafka_2.12-2.7.1.tgz tar -zxf kafka_2.12-2.7.1.tgz cd kafka_2.12-2.7.1
创建Kafka topic。
./bin/kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create
Lindorm Stream Kafka地址获取方式:Lindorm实例控制台的
页签下。将实时数据写入Kafka topic。
./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic {"uID": "A", "x":"-74.00035", "y": "40.72432"} {"uID": "B", "x":"-74.00239", "y": "40.71692"} {"uID": "C", "x":"-74.00201", "y": "40.72563"} {"uID": "D", "x":"-74.00158", "y": "40.72412"} {"uID": "E", "x":"-73.99836", "y": "40.71588"} {"uID": "F", "x":"-74.01015", "y": "40.71422"} {"uID": "G", "x":"-73.99183", "y": "40.71451"} {"uID": "H", "x":"-73.99595", "y": "40.71773"}
您可以使用
./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --from-beginning
命令,查看数据是否成功写入。
提交流引擎计算任务。
在ECS实例中,执行以下命令,下载、解压流引擎客户端。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz tar zxvf lindorm-sqlline-2.0.2.tar.gz
进入
lindorm-sqlline-0.1.5/bin
目录,执行以下命令连接至Lindorm流引擎。./lindorm-sqlline -url <Lindorm Stream SQL地址>
Lindorm Stream SQL地址获取方式:Lindorm实例控制台的
页签下。提交流引擎计算任务。
该任务读取Kafka Topic中的实时数据与宽表引擎中的regions表进行计算,并使用
ST_Contains
函数进行过滤,计算实时数据属于哪一个行政区域,最后将结果写入fresult表中。CREATE FJOB fenceFilter ( LOAD MODULE ganos; -- Enable parallelism SET 'parallelism.default'='12'; -- Create stream table CREATE TABLE carData(`uID` STRING, `x` DOUBLE, `y` DOUBLE, `proctime` AS PROCTIME() ) WITH ( 'connector'='kafka', 'topic'='log_topic', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='<Lindorm Stream Kafka地址>', 'format'='json' ); -- Create area table CREATE TABLE regions ( `rID` INT, `rName` STRING, `fence` GEOMETRY, PRIMARY KEY (`rID`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<ld-bp1ri5k784d1d****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020>', 'userName'='root', 'password'='<your_passwd>', 'tableName'='regions', 'namespace'='default' ); -- Create result table CREATE TABLE fresult ( `uID` STRING, `rName` STRING, `rID` INT, PRIMARY KEY (`uID`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<ld-bp1ri5k784d1d****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020>', 'userName'='root', 'password'='<your_passwd>', 'tableName'='fresult', 'namespace'='default' ); -- Find the area that car located INSERT INTO fresult SELECT A.uID, B.rName, B.rID FROM carData AS A JOIN regions /*+ OPTIONS('geomHint'='fence:st_contains','geomIndex'='true','cacheTTLMs'='1800000') */ FOR SYSTEM_TIME AS OF A.proctime AS B ON B.fence=ST_MakePoint(A.x,A.y); );
关键参数说明:
properties.bootstrap.servers:Lindorm Stream Kafka地址。
seedServer:Lindorm 宽表SQL地址,获取方式为Lindorm实例控制台的
页签下,该地址的端口需填写30020,同时该地址需去除jdbc:lindorm:table:url=http://
字符串,只保留连接地址,例如'seedServer'='ld-bp1ri5k784d1d****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020'
。userName:宽表引擎用户名,默认为root。
password:宽表引擎密码,获取方式为Lindorm实例控制台的
页签下。
在宽表引擎中,查询结果表fresult,获取计算结果。
命令示例:
SELECT * FROM fresult;
预计返回:
+-----+-----------+-----+ | uID | rName | rID | +-----+-----------+-----+ | A | SoHo | 1 | | B | Chinatown | 2 | | C | SoHo | 1 | | D | SoHo | 1 | | E | Chinatown | 2 | | F | Tribeca | 3 | | G | Chinatown | 2 | | H | Chinatown | 2 | +-----+-----------+-----+
- 本页导读 (1)