将时空数据写入宽表引擎

本文介绍将流引擎中的时空数据写入宽表的具体方法。

前提条件

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

  • 如果客户端没有部署在ECS,需要将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单

  • 如果应用部署在ECS,您需要确保云原生多模数据库 Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

    • 所在地域相同,并建议使用相同的可用区域(以减少网络延时)。

    • 使用相同的专有网络。

  • 已下载并解压Lindorm-cli压缩包

准备工作

通过Lindorm-cli连接宽表引擎并创建一张宽表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎。创建宽表的语句示例如下:

CREATE TABLE gps_data (
  id int,
  g geometry(point),
  t long,
  ship_name varchar,
  PRIMARY KEY(id, t));

操作步骤

  1. 通过Lindorm-cli连接Lindorm流引擎。进入lindorm-sqlline-0.1.4/bin目录,执行以下命令连接至Lindorm流引擎。

    ./lindorm-sqlline -url <连接地址> -u <用户名> -p <密码>

    参数

    示例

    说明

    连接地址

    jdbc:streamsql:url=http://ld-bp128rpk35sh1****-proxy-stream.lindorm.rds.aliyuncs.com:30060

    Lindorm流引擎的Lindorm Stream SQL连接地址,获取方法请参见查看连接地址

    用户名

    root

    如果您忘记密码,可以通过Lindorm宽表引擎的集群管理系统修改密码,具体操作请参见修改用户密码

    密码

    root

  2. 创建外表,将宽表gps_data关联到流引擎。

    CREATE External Table gps_data
      WITH (
        table_type = 'lindorm.table',
        table_name = 'gps_data',
        endpoint = 'ld-bp128rpk35sh1****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020');

    参数说明如下:

    参数

    是否必选

    描述

    table_type

    表示将宽表数据关联到流引擎,固定为lindorm.table

    table_name

    Lindorm宽表的表名。

    endpoint

    Lindorm宽表引擎中HBase兼容的专有网络连接地址,具体操作请参见查看连接地址

  3. 创建数据流表,用于写入流式数据。

    CREATE STREAM gps_data_stream (
      id int,
      g Geometry(point),
      t  bigint,
      ship_name varchar
      ) WITH (stream_topic='gps_data_stream', value_format='JSON', key_value = 'id');

    参数说明如下:

    参数

    是否必选

    描述

    stream_topic

    指定数据流表的物理数据存储在流存储的某个Topic。

    value_format

    指定数据源写入Lindorm流引擎的数据格式,取值如下:

    • CSV

    • Avro

    • JSON

    key_value

    指定数据流表的主键列。

  4. 创建计算任务,将数据流表中的数据写入到宽表中。

    • 不使用时空函数。

      CREATE CQ gps_data_cq INSERT INTO gps_data SELECT * FROM gps_data_stream;
    • 使用时空函数,支持的时空数据类型和时空函数请参见空间数据类型函数概览

      CREATE CQ gps_data_cq INSERT INTO gps_data SELECT * FROM gps_data_stream WHERE ST_Contains(ST_GeomFromText('POLYGON((90 10, 200 10, 200 40, 90 40, 90 10))'), g);
  5. 写入数据,使用Kafka客户端写入数据到数据流表,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据。完整的代码示例如下:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONException;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class LindormStreamDemoGanos {
    
      public static void main(String[] args) throws InterruptedException, ExecutionException, JSONException {
        Properties props = new Properties();
        //设置Lindorm Stream Kafka地址,这个Lindorm Stream Kafka地址为专有网络地址,需确保ECS实例和Lindorm实例使用相同的专有网络。如果客户端没有部署在ECS,需要将客户端IP地址添加至Lindorm实例的白名单中。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxxxx-proxy-stream.lindorm.rds.aliyuncs.com:30080");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        //指定数据流表的物理数据存储在某个Topic
        String topic = "gps_data_stream";
    
        try {
          for (int i = 0; i < 10; i++) {
            JSONObject json = new JSONObject();
            //写入流引擎数据
            json.put("id", i);
            json.put("g", "POINT(119.073544 " + (25 + i * 10) + ")");
            json.put("t", System.currentTimeMillis() - 1000 * i);
            json.put("ship_name", "ship007");
            Future<RecordMetadata> future = producer
                .send(new ProducerRecord<String, String>(topic, json.getString("id"), json.toString()));
            producer.flush();
            try {
              RecordMetadata recordMetadata = future.get();
              System.out.println("Produce ok:" + recordMetadata.toString());
            } catch (Throwable t) {
              System.out.println("Produce exception " + t.getMessage());
              throw t;
            }
          }
        } catch (Exception e) {
          System.out.println("Produce exception " + e.getMessage());
          throw e;
        }
      }
    }
                            
  6. 查询数据,通过以下方式查询数据流表和宽表的数据。

    • 通过流引擎SQL查询数据流表的数据。

      • 不使用时空函数查询,在数据流表中执行查询语句时会默认将时空数据转化为WKT格式,有关WKT格式的介绍请参见空间数据类型

        SELECT * FROM gps_data_stream;

        查询结果如下:直接查询的结果

      • 使用时空函数查询。

        SELECT * FROM gps_data_stream WHERE ST_Contains(ST_GeomFromText('POLYGON((90 10, 200 10, 200 40, 90 40, 90 10))'), g);
    • 通过宽表SQL查询宽表的数据。

      SELECT * FROM gps_data;

      查询结果如下:查询宽表结果