本文为您介绍如何使用Milvus连接器。
背景信息
Milvus是一个高度可扩展的向量数据库,专为处理大规模非结构化数据(如图像、文本和音频)设计。支持高效的相似性搜索,适用于推荐系统、图像检索、语义搜索等场景。Milvus连接器支持的信息如下:
类别 | 详情 |
支持类型 | 结果表 |
运行模式 | 流模式 |
数据格式 | 无 |
特有监控指标 | 无 |
API种类 | SQL |
是否支持更新/删除 | 是 |
特色功能
Milvus连接器深度集成Apache Flink与Milvus向量数据库,为实时向量检索场景提供高性能、高可靠的数据管道。以下为Milvus连接器核心功能特性说明:
高并发写入:支持自定义结果表并行度;
自动重试机制:失败后可自动重试,提升稳定性;
批量缓冲写入:通过设置攒批写入升写入性能。
At-Least-Once 语义:主持主键幂等更新,实现数据最终一致性;
前提条件
已创建Milvus集群,详情请参见快速创建Milvus实例。
Milvus集合结构需提前创建。如使用分区写入,请确保Milvus分区已存在。
使用限制
仅实时计算引擎VVR 11.1及以上版本支持。
仅支持2.4.x版本Milvus。
Milvus连接器仅支持At Least Once语义。
语法结构
CREATE TEMPORARY TABLE milvus_sink (
id BIGINT,
f1 STRING,
f2 BOOLEAN,
f3 TINYINT,
f4 SMALLINT,
f5 INTEGER,
f6 DATE,
f7 TIME(3),
f8 TIMESTAMP_LTZ(3),
f9 TIMESTAMP(3),
f10 FLOAT,
f11 DOUBLE,
f12 DECIMAL(10, 2),
f13 ARRAY<FLOAT>,
f14 ARRAY<DOUBLE>,
f15 ARRAY<INTEGER>,
f16 ARRAY<BIGINT>,
PRIMARY KEY (id) NOT ENFORCED -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
'connector'='milvus',
'endpoint'='<yourEndpoint>',
'port'='<yourPort>',
'userName'='<yourUserName>',
'password'='<yourPassword>',
'databaseName'='<yourDatabaseName>',
'collectionName'='<yourCollectionName>'
);
WITH参数
通用参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 连接器名称。 | String | 是 | 无 | 指定使用的连接器名称,必须为 |
endPoint | Milvus数据库的访问地址(IP或域名) | String | 是 | 无 | 详情请参见网络访问与安全设置。 |
port | Milvus数据库服务的端口号。 | INTEGER | 否 | 19530 | |
username | Milvus数据库服务的用户名。 | STRING | 是 | 无 | 无。 |
password | Milvus数据库服务的密码。 | STRING | 是 | 无 | |
databaseName | Milvus数据库名称。 | STRING | 是 | 无 | |
collectionName | Milvus集合(Collection)名称。 | STRING | 是 | 无 | |
partitionName | 写入的分区名称。 | STRING | 否 | _default | |
partitionKey.enabled | 集合是否启用了标量字段作为Partition Key。 | BOOLEAN | 否 | false |
结果表独有参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
sink.parallelism | 自定义Sink的并行度。 | INTEGER | 否 | 无 | 上游算子的并发,由系统框架决定。 |
sink.maxRetries | 写入失败时的最大重试次数。 | INTEGER | 否 | 3 | 无。 |
sink.buffer-flush.max-rows | 缓存记录的最大数量(包括append、upsert和delete),超过该值将刷新数据到Milvus; | INTEGER | 否 | 10000 | 设定为 |
sink.buffer-flush.interval | 缓存记录的刷新间隔时间,超过该时间将刷新数据到Milvus。 | INTEGER | 否 | 1000 | 单位:毫秒(ms),设定为 |
sink.ignoreDelete | 是否忽略删除操作。 | BOOLEAN | 否 | false | 参数取值如下:
|
类型映射
FlinkSQL类型 | Milvus类型 |
STRING | VarChar(n) |
BOOLEAN | Bool |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
DATE | VarChar(n) |
TIME(3) | VarChar(n) |
TIMESTAMP_LTZ(3) | Int64 说明 Epoch毫秒值。 |
TIMESTAMP(3) | VarChar(n) |
FLOAT | Float |
DOUBLE | Double |
DECIMAL(10, 2) | VarChar(n) |
ARRAY<FLOAT> | FloatVector 说明 在创建Milvus Collection之后,需要为向量字段创建索引。 |
ARRAY<DOUBLE> | Array<Double>[m] |
ARRAY<INTEGER> | Array<Int32>[m] |
ARRAY<BIGINT> | Array<Int64>[m] |
使用示例
生成模拟流数据写入Milvus。
-- 生成模拟流数据(每2秒生成一条)
CREATE TEMPORARY TABLE mock_source (
id STRING,
vector ARRAY<FLOAT>, -- 向量以FLOAT数组形式传入,默认长度3
event_time AS PROCTIME() -- 事件时间(Flink自动生成)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒生成100条
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
CREATE TEMPORARY TABLE milvus_sink (
id STRING, -- 唯一标识(如设备ID)
vector ARRAY<FLOAT>, -- 向量数据(必须为FLOAT数组,且数组长度必须和上游一致)
timestamp BIGINT -- 时间戳(用于流处理)
PRIMARY KEY (id) NOT ENFORCED -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- 转换数据并写入Milvus
INSERT INTO milvus_stream_sink
SELECT
id,
vector,
UNIX_TIMESTAMP() * 1000 AS timestamp -- 当前时间戳(毫秒)
FROM mock_source;