Quick Start

更新时间:
复制 MD 格式

The Lindorm Ganos spatio-temporal service offers a stream engine interface suitable for scenarios such as the Internet of Vehicles (IoV), the Internet of Things (IoT), and autonomous driving. You can use the real-time computing and analysis capabilities of the Lindorm stream engine to process trajectory point data. By combining these capabilities with spatio-temporal functions, you can perform various real-time trajectory analyses, such as geo-fencing, regional statistics, and trajectory generation.

Prerequisites

  • LindormTable is activated, and its version is 2.6.5 or later. For more information about how to view or upgrade the current version, see LindormTable Version Guide and Minor version update.

  • The stream engine is activated. For more information, see Activate the stream engine.

  • A Java environment is installed. JDK 1.8 or later is required.

  • The client IP address is added to the Lindorm whitelist. For more information, see Set a whitelist.

Procedure

This topic uses a trajectory deviation scenario in the Internet of Vehicles (IoV) as an example. You will analyze vehicle trajectory points in real time and use geo-fences to calculate a vehicle's current location. This helps determine if the vehicle has deviated from its set route.

Create a geo-fence table and a sink table

Create two tables in LindormTable. One stores geo-fence data, and the other stores the stream engine's calculation results.

  1. Connect to LindormTable using Lindorm-cli. For more information, see Connect to and use LindormTable using Lindorm-cli.

  2. Create the geo-fence table `regions`. It contains three columns: `rID` (geo-fence area ID), `rName` (geo-fence area name), and `fence` (geo-fence data).

    CREATE TABLE regions(
      rID INT,
      rName VARCHAR,
      fence GEOMETRY,
      PRIMARY KEY(rID)
      );
  3. Create the sink table `fresult`. It contains three columns: `uID` (vehicle ID), `rName` (area name), and `rID` (area ID).

    CREATE TABLE fresult(
      uID VARCHAR, 
      rName VARCHAR, 
      rID INT, 
      PRIMARY KEY (uID)
      );

Write geo-fence data

Write test data for three areas to the `regions` geo-fence table: SoHo, Chinatown, and Tribeca.

INSERT INTO regions(rID, rName, fence) VALUES 
(1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')),
(2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')),
(3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));

For more information about the ST_GeomFromText function, see ST_GeomFromText.

Ingest stream data

Use an open source Kafka script tool to connect to the Lindorm stream engine and write sample data to a Kafka topic.

  1. Connect to the Lindorm stream engine and create a Kafka topic named `logVehicle`. For more information, see Connect to the Lindorm stream engine using an open source Kafka script tool.

  2. Run the following command to write the sample data to the created Kafka topic. Download the sample data: testcar.txt.

    ./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka endpoint> --topic logVehicle < testcar.txt
    Note
    • Download the sample data testcar.txt and upload it to the root directory of the open source Kafka script tool.

    • Use the ./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka endpoint> --topic logVehicle --from-beginning command to verify that the data was written successfully.

Submit a stream engine computing task

Use Flink SQL to submit a Lindorm stream engine computing task. The task reads data from the open source Kafka topic and performs calculations with the LindormTable data.

  1. Connect to the Lindorm stream engine. For more information, see Step 2: Install the stream engine client.

  2. Create a geo-fence computing task.

    The following computing task performs a geo-fence calculation to filter points from Kafka against the LindormTable table. The task then outputs the administrative region for each point. The task flow is as follows:

    1. Load the ganos function module.

    2. In the Flink Job, create three tables: the `carData` data source table, the `regions` dimension table, and the `fresult` sink table. Set the connector parameters to associate these tables with the Kafka topic, the `regions` geo-fence table, and the `fresult` sink table, respectively.

    3. Create a stream task. Use the geomHint hint with the st_contains function to filter data and write the results to the sink table. An index is used for acceleration.

    CREATE FJOB fenceFilter (
      LOAD MODULE ganos;
      -- enable parallelism
      SET 'parallelism.default'='8';
      -- create stream table
      CREATE TABLE carData(`uID` STRING, `x` DOUBLE, `y` DOUBLE, `proctime` AS PROCTIME()
        ) WITH (
        'connector'='kafka',
        'topic'='logVehicle',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='<Lindorm Stream Kafka endpoint>',
        'format'='json'
      );
      -- create area table
      CREATE TABLE regions (
        `rID` INT,
        `rName` STRING,
        `fence` GEOMETRY,
        PRIMARY KEY (`rID`) NOT ENFORCED
        ) WITH (
        'connector'='lindorm',
        'seedServer'='<LindormTable HBase Java API endpoint>',
        'userName'='<username for LindormTable>',
        'password'='<password for LindormTable>',
        'tableName'='regions',
        'namespace'='<database where the regions geo-fence table is located>'
      );
      --create result table
      CREATE TABLE fresult (
        `uID` STRING,
        `rName` STRING,
        `rID` INT,
        PRIMARY KEY (`uID`) NOT ENFORCED
      ) WITH (
        'connector'='lindorm',
        'seedServer'='<LindormTable HBase Java API endpoint>',
        'userName'='<username for LindormTable>',
        'password'='<password for LindormTable>',
        'tableName'='fresult',
        'namespace'='<database where the fresult sink table is located>'
      );
      -- find the area that car located
      INSERT INTO fresult 
        SELECT A.uID, B.rName, B.rID
        FROM carData AS A 
        JOIN regions /*+ OPTIONS('geomHint'='fence:st_contains','geomIndex'='true','cacheTTLMs'='1800000') */ 
        FOR SYSTEM_TIME AS OF A.proctime AS B 
        ON B.fence=ST_MakePoint(A.x,A.y);
    );
    Note

View the results

Use an SQL statement in LindormTable to view the calculation results.

  1. Connect to and use LindormTable using Lindorm-cli.

  2. Query the calculation results.

    SELECT * FROM fresult;

    The following result is returned:

    +-----+---------------+-----+
    | uID |     rName     | rID |
    +-----+---------------+-----+
    | A   | Soho          | 1   |
    | B   | Chinatown     | 2   |
    | C   | Soho          | 1   |
    | D   | Soho          | 1   |
    | E   | Chinatown     | 2   |
    | F   | Tribeca       | 3   |
    | G   | Chinatown     | 2   |
    | H   | Chinatown     | 2   |
    +-----+---------------+-----+

    The result shows the location of each vehicle. For example, the vehicle with `uID` A is currently in the Soho area.