文档

通过流引擎实现地理围栏监控

更新时间:

本文介绍如何通过流引擎对车辆进行实时地理围栏监控,并判断车辆轨迹是否异常。

场景说明

通过流引擎的流数据存储和实时计算能力,结合Ganos时空引擎技术,对车辆的点位数据进行地理围栏(指定的路径数据)监控判断。在地理围栏监控场景中,根据业务需求提前将地理围栏数据记录在Lindorm宽表中(图中的route_table表)。将车辆的实时点位数据写入Lindorm流引擎的input_stream数据流表中,将车辆点位数据和地理围栏数据进行关联并判断车辆点位是否在地理围栏内,并将异常车辆数据输出到output_stream数据流表,同时也可以订阅异常数据进行报警。场景实现的原理图如下:场景实现图

前提条件

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

  • 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单

  • Lindorm流引擎的Lindorm Stream Kafka地址为专有网络地址,需要确保客户端部署的环境和云原生多模数据库 Lindorm实例使用相同的专有网络。

操作步骤

  1. 通过Lindorm-cli连接宽表引擎,并在宽表引擎中创建一张记录地理围栏数据的宽表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎。语句示例如下:

    CREATE TABLE route_table (
      route_id varchar,
      region geometry(polygon),  // 地理围栏数据
      PRIMARY KEY(route_id)
    );

    写入地理围栏数据。

    UPSERT INTO route_table (route_id, region) VALUES ('路线1', 'POLYGON (...)'); 
    UPSERT INTO route_table (route_id, region) VALUES ('路线2', 'POLYGON (...)'); 
    UPSERT INTO route_table (route_id, region) VALUES ('路线3', 'POLYGON (...)'); 
  2. 通过Lindorm-cli连接Lindorm流引擎。进入lindorm-sqlline-0.1.4/bin目录,执行以下命令连接至Lindorm流引擎。

    ./lindorm-sqlline -url <连接地址> -u <用户名> -p <密码>

    参数

    示例

    说明

    连接地址

    jdbc:streamsql:url=http://ld-bp128rpk35sh1****-proxy-stream.lindorm.rds.aliyuncs.com:30060

    Lindorm流引擎的Lindorm Stream SQL连接地址,获取方法请参见查看连接地址

    用户名

    root

    如果您忘记密码,可以通过Lindorm宽表引擎的集群管理系统修改密码,具体操作请参见修改用户密码

    密码

    root

  3. 创建外表,将宽表route_table关联到流引擎。

    CREATE External Table gps_data
      WITH (
        table_type = 'lindorm.table',
        table_name = 'route_table',
        endpoint = 'ld-bp128rpk35sh1****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020');

    参数说明如下:

    参数

    是否必选

    描述

    table_type

    表示将宽表数据关联到流引擎,固定为lindorm.table

    table_name

    Lindorm宽表的表名。

    endpoint

    Lindorm宽表引擎中HBase兼容的专有网络连接地址,具体操作请参见查看连接地址

  4. 创建两张数据流表。input_stream表用于写入流式数据,output_stream表用于输出异常车辆数据。

    CREATE STREAM input_stream (
      car_id varchar,
      route_id varchar,
      location Geometry(point)
    ) WITH (
        stream_topic='input_stream',
        value_format='JSON',
        key_value = 'car_id'
    );
    CREATE STREAM output_stream (
      car_id varchar,
      route_id varchar,
      location Geometry(point)
    ) WITH (
        stream_topic='output_stream',
        value_format='JSON',
        key_value = 'car_id',
        stream_partitions = 10
    );

    参数说明如下:

    参数

    是否必选

    描述

    stream_topic

    指定数据流表的物理数据存储在流存储的某个Topic。

    value_format

    指定数据源写入Lindorm流引擎的数据格式,取值如下:

    • CSV

    • Avro

    • JSON

    key_value

    指定数据流表的主键列。

    stream_partitions

    指定数据流表的分区数量,默认值为1。

  5. 创建计算任务判断车辆轨迹是否异常。使用ST_Contains函数判断车辆点位是否在地址围栏内,有关ST_Contains函数的介绍请参见ST_Contains

    CREATE CQ gps_monitor INSERT INTO output_stream
    SELECT l.car_id   AS car_id,
           l.route_id AS route_id,
           l.location AS location
    FROM   input_stream l
           LEFT JOIN route_table r
                  ON l.route_id = r.route_id
    WHERE  ST_Contains(r.region, l.location) = false;
  6. 写入数据,使用Kafka客户端写入数据到数据流表input_stream,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据。完整的代码示例如下:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONException;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class LindormStreamDemoGanos {
    
      public static void main(String[] args) throws InterruptedException, ExecutionException, JSONException {
        Properties props = new Properties();
        //设置Lindorm Stream Kafka地址。Lindorm Stream Kafka地址为专有网络地址,只有当客户端部署的环境和Lindorm实例使用相同的专有网络才能访问流引擎。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxxxx-proxy-stream.lindorm.rds.aliyuncs.com:30080");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        //指定数据流表的物理数据存储在某个Topic
        String topic = "input_stream";
    
        try {
          for (int i = 0; i < 10; i++) {
            JSONObject json = new JSONObject();
            //写入流引擎数据
            json.put("car_id", "车牌号码");
            json.put("route_id", "路线"+i);
            json.put("location", "POINT(119.073544 " + (25 + i * 10) + ")");
            Future<RecordMetadata> future = producer
                .send(new ProducerRecord<String, String>(topic, json.getString("id"), json.toString()));
            producer.flush();
            try {
              RecordMetadata recordMetadata = future.get();
              System.out.println("Produce ok:" + recordMetadata.toString());
            } catch (Throwable t) {
              System.out.println("Produce exception " + t.getMessage());
              throw t;
            }
          }
        } catch (Exception e) {
          System.out.println("Produce exception " + e.getMessage());
          throw e;
        }
      }
    }
                            
  7. 使用开源的Kafka客户端订阅数据流表output_stream中的数据,具体操作请参见通过Pull模式创建数据订阅通道