本文介绍将流引擎中的时空数据写入宽表的具体方法。
前提条件
已安装Java环境,要求安装JDK 1.8及以上版本。
如果客户端没有部署在ECS,需要将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单。
如果应用部署在ECS,您需要确保云原生多模数据库 Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
所在地域相同,并建议使用相同的可用区域(以减少网络延时)。
使用相同的专有网络。
已下载并解压Lindorm-cli压缩包。
准备工作
通过Lindorm-cli连接宽表引擎并创建一张宽表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎。创建宽表的语句示例如下:
CREATE TABLE gps_data (
id int,
g geometry(point),
t long,
ship_name varchar,
PRIMARY KEY(id, t));
操作步骤
通过Lindorm-cli连接Lindorm流引擎。进入
lindorm-sqlline-0.1.4/bin
目录,执行以下命令连接至Lindorm流引擎。创建外表,将宽表
gps_data
关联到流引擎。CREATE External Table gps_data WITH ( table_type = 'lindorm.table', table_name = 'gps_data', endpoint = 'ld-bp128rpk35sh1****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020');
参数说明如下:
参数
是否必选
描述
table_type
是
表示将宽表数据关联到流引擎,固定为
lindorm.table
。table_name
是
Lindorm宽表的表名。
endpoint
是
Lindorm宽表引擎中HBase兼容的专有网络连接地址,具体操作请参见查看连接地址。
创建数据流表,用于写入流式数据。
CREATE STREAM gps_data_stream ( id int, g Geometry(point), t bigint, ship_name varchar ) WITH (stream_topic='gps_data_stream', value_format='JSON', key_value = 'id');
参数说明如下:
参数
是否必选
描述
stream_topic
否
指定数据流表的物理数据存储在流存储的某个Topic。
value_format
是
指定数据源写入Lindorm流引擎的数据格式,取值如下:
CSV
Avro
JSON
key_value
是
指定数据流表的主键列。
创建计算任务,将数据流表中的数据写入到宽表中。
写入数据,使用Kafka客户端写入数据到数据流表,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据。完整的代码示例如下:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class LindormStreamDemoGanos { public static void main(String[] args) throws InterruptedException, ExecutionException, JSONException { Properties props = new Properties(); //设置Lindorm Stream Kafka地址,这个Lindorm Stream Kafka地址为专有网络地址,需确保ECS实例和Lindorm实例使用相同的专有网络。如果客户端没有部署在ECS,需要将客户端IP地址添加至Lindorm实例的白名单中。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxxxx-proxy-stream.lindorm.rds.aliyuncs.com:30080"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //指定数据流表的物理数据存储在某个Topic String topic = "gps_data_stream"; try { for (int i = 0; i < 10; i++) { JSONObject json = new JSONObject(); //写入流引擎数据 json.put("id", i); json.put("g", "POINT(119.073544 " + (25 + i * 10) + ")"); json.put("t", System.currentTimeMillis() - 1000 * i); json.put("ship_name", "ship007"); Future<RecordMetadata> future = producer .send(new ProducerRecord<String, String>(topic, json.getString("id"), json.toString())); producer.flush(); try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { System.out.println("Produce exception " + t.getMessage()); throw t; } } } catch (Exception e) { System.out.println("Produce exception " + e.getMessage()); throw e; } } }
查询数据,通过以下方式查询数据流表和宽表的数据。
通过流引擎SQL查询数据流表的数据。
不使用时空函数查询,在数据流表中执行查询语句时会默认将时空数据转化为WKT格式,有关WKT格式的介绍请参见空间数据类型。
SELECT * FROM gps_data_stream;
查询结果如下:
使用时空函数查询。
SELECT * FROM gps_data_stream WHERE ST_Contains(ST_GeomFromText('POLYGON((90 10, 200 10, 200 40, 90 40, 90 10))'), g);
通过宽表SQL查询宽表的数据。
SELECT * FROM gps_data;
查询结果如下: