通过实时计算Flink集成向量数据
云原生数据仓库AnalyticDB PostgreSQL版支持通过flink-adbpg-connector集成向量化数据。本文以将Kafka数据导入至AnalyticDB PostgreSQL版为例,介绍如何将向量数据导入AnalyticDB PostgreSQL版。
前提条件
已创建AnalyticDB PostgreSQL版实例。具体操作,请参见创建实例。
已创建Flink全托管工作空间,且与AnalyticDB PostgreSQL版实例位于同一VPC下。具体操作,请参见开通Flink全托管。
若实时计算Flink端使用开源自建版,请确保flink-adbpg-connector已安装在
$FLINK_HOME/lib
目录下。若使用公有云托管版,则无任何操作。
AnalyticDB PostgreSQL版数据库已安装向量检索插件FastANN。
您可以在psql客户端通过
\dx fastann
命令查看是否安装。如果返回FastANN插件的相关信息,表示已安装。
如果没有返回任何信息,请提交工单联系技术支持进行安装。
已购买并部署Kafka实例,且Kafka实例与AnalyticDB PostgreSQL版实例位于同一VPC下。具体操作,请参见购买和部署实例。
已将Flink工作空间和Kafka实例所属的网段加入AnalyticDB PostgreSQL版的白名单。具体操作,请参见设置白名单。
测试数据
为方便测试,AnalyticDB PostgreSQL版提供了测试数据。下载链接,请参见vector_sample_data.csv。
测试数据的表结构如下。
字段 | 类型 | 说明 |
id | bigint | 编号。 |
market_time | timestamp | 汽车上市时间。 |
color | varchar(10) | 汽车的颜色。 |
price | int | 汽车的价格。 |
feature | float4[] | 汽车照片的特征向量。 |
操作流程
创建结构化索引和向量化索引
连接AnalyticDB PostgreSQL版数据库。本文以通过psql客户端连接数据库为例,详情请参见psql连接数据库。
执行以下命令,创建测试库并切换至测试库。
CREATE DATABASE adbpg_test; \c adbpg_test
执行以下命令,创建目标表。
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);
执行以下命令,创建结构化索引和向量化索引。
-- 修改向量列的存储格式为PLAIN。 ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- 创建结构化索引。 CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- 创建向量索引。 CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
将向量化测试数据写入Kafka Topic
执行以下命令,创建Kafka Topic。
bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ --bootstrap-server <your_broker_list>
执行以下命令,将向量测试数据写入Kafka Topic。
bin/kafka-console-producer.sh\ --bootstrap-server <your_broker_list>\ --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list>
:接入点信息。您可在云消息队列Kafka版控制台的实例详情页面的接入点信息区域获取。
创建映射表并导入数据
创建Flink作业。
执行以下命令,创建AnalyticDB PostgreSQL版映射表。
CREATE TABLE vector_ingest ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature VARCHAR )WITH ( 'connector' = 'adbpg-nightly-1.13', 'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test', 'tablename' = 'car_info', 'username' = '<your_username>', 'password' = '<your_password>', 'targetschema' = 'vector_test', 'maxretrytimes' = '2', 'batchsize' = '3000', 'batchwritetimeoutms' = '10000', 'connectionmaxactive' = '20', 'conflictmode' = 'ignore', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
参数说明,请参见写入数据到AnalyticDB PostgreSQL版。
执行以下命令,创建Kafka映射表。
CREATE TABLE vector_kafka ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature string ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<your_broker_list>', 'topic' = 'vector_ingest', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'scan.startup.mode' = 'earliest-offset' );
参数说明如下。
参数
是否必填
说明
connector
是
连接器名。固定值为Kafka。
properties.bootstrap.servers
是
接入点信息。您可在云消息队列Kafka版控制台的实例详情页面的接入点信息区域获取。
topic
是
Kafka消息所在的Topic名称。
format
是
写入Kafka消息Value部分时使用的格式。支持的格式:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
csv.field-delimiter
是
CSV字段分隔符。
scan.startup.mode
是
Kafka读取数据的启动位点。取值如下:
earliest-offset:从Kafka最早分区开始读取。
latest-offset:从Kafka最新位点开始读取。
执行以下命令,创建导入任务。
INSERT INTO vector_ingest SELECT * FROM vector_kafka;