Milvus

本文为您介绍如何使用Milvus连接器。

背景信息

Milvus是一个高度可扩展的向量数据库,专为处理大规模非结构化数据(如图像、文本和音频)设计。支持高效的相似性搜索,适用于推荐系统、图像检索、语义搜索等场景。Milvus连接器支持的信息如下:

类别

详情

支持类型

结果表,向量表

运行模式

流模式

数据格式

特有监控指标

API种类

SQL

是否支持更新/删除

特色功能

Milvus连接器深度集成Apache FlinkMilvus向量数据库,为实时向量检索场景提供高性能、高可靠的数据管道。以下为Milvus连接器核心功能特性说明:

  • 高并发写入:支持自定义结果表并行度;

  • 自动重试机制:失败后可自动重试,提升稳定性;

  • 批量缓冲写入:通过设置攒批写入升写入性能。

  • At-Least-Once 语义:主持主键幂等更新,实现数据最终一致性;

  • 支持向量搜索:允许用户通过向量搜索相似数据;

前提条件

  • 已创建Milvus集群,详情请参见快速创建Milvus实例

  • Milvus集合结构需提前创建。如使用分区写入,请确保Milvus分区已存在。

使用限制

  • 仅实时计算引擎VVR 11.1及以上版本支持作为结果表写入。

  • 仅实时计算引擎VVR 11.3及以上版本支持作为向量表供查询。

  • 仅支持2.4.x版本Milvus

  • Milvus连接器仅支持At Least Once语义。

语法结构

CREATE TEMPORARY TABLE milvus_sink (
  id BIGINT,
  f1 STRING,
  f2 BOOLEAN,
  f3 TINYINT,
  f4 SMALLINT,
  f5 INTEGER,
  f6 DATE,
  f7 TIME(3),
  f8 TIMESTAMP_LTZ(3),
  f9 TIMESTAMP(3),
  f10 FLOAT,
  f11 DOUBLE,
  f12 DECIMAL(10, 2),
  f13 ARRAY<FLOAT>,
  f14 ARRAY<DOUBLE>,
  f15 ARRAY<INTEGER>,
  f16 ARRAY<BIGINT>,
  PRIMARY KEY (id) NOT ENFORCED  -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
  'connector'='milvus',
  'endpoint'='<yourEndpoint>',
  'port'='<yourPort>',
  'userName'='<yourUserName>',
  'password'='<yourPassword>',
  'databaseName'='<yourDatabaseName>',
  'collectionName'='<yourCollectionName>'
);

WITH参数

通用参数

参数

说明

数据类型

是否必填

默认值

备注

connector

连接器名称。

String

指定使用的连接器名称,必须为milvus

endPoint

Milvus数据库的访问地址(IP或域名)

String

详情请参见网络访问与安全设置

port

Milvus数据库服务的端口号。

INTEGER

19530

username

Milvus数据库服务的用户名。

STRING

无。

password

Milvus数据库服务的密码。

STRING

databaseName

Milvus数据库名称。

STRING

collectionName

Milvus集合(Collection)名称。

STRING

partitionName

写入的分区名称。

STRING

_default

partitionKey.enabled

集合是否启用了标量字段作为Partition Key。

BOOLEAN

false

maxReries

重试次数。

INTEGER

3

无。

结果表独有参数

参数

说明

数据类型

是否必填

默认值

备注

sink.parallelism

自定义Sink的并行度。

INTEGER

上游算子的并发,由系统框架决定。

sink.maxRetries

写入失败时的最大重试次数。

INTEGER

3

该参数从实时计算引擎VVR 11.3起已废弃,建议使用 maxReries 作为替代。

sink.buffer-flush.max-rows

缓存记录的最大数量(包括append、upsertdelete),超过该值将刷新数据到Milvus;

INTEGER

10000

设定为0表示禁用。

sink.buffer-flush.interval

缓存记录的刷新间隔时间,超过该时间将刷新数据到Milvus。

INTEGER

1000

单位:毫秒(ms),设定为0表示禁用。

sink.ignoreDelete

是否忽略删除操作。

BOOLEAN

false

参数取值如下:

  • true:忽略Delete操作。

  • false:不忽略Delete操作。

向量表独有参数

参数

说明

数据类型

是否必填

默认值

备注

search.metric

评价向量相似度指标。

String

L2

支持的相似度指标请参考 milvus 文档。目前,milvus 2.4 版本支持的指标如下:

  • L2:使用欧几里得距离衡量相似度。

  • IP:使用向量点积计算相似度。

  • COSINE: 使用三角函数 Cosine 计算相似度。

类型映射

FlinkSQL类型

Milvus类型

STRING

VarChar(n)

BOOLEAN

Bool

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

DATE

VarChar(n)

TIME(3)

VarChar(n)

TIMESTAMP_LTZ(3)

Int64

说明

Epoch毫秒值。

TIMESTAMP(3)

VarChar(n)

FLOAT

Float

DOUBLE

Double

DECIMAL(10, 2)

VarChar(n)

ARRAY<FLOAT>

FloatVector

说明

在创建Milvus Collection之后,需要为向量字段创建索引。

ARRAY<DOUBLE>

Array<Double>[m]

ARRAY<INTEGER>

Array<Int32>[m]

ARRAY<BIGINT>

Array<Int64>[m]

使用示例

  • 生成模拟流数据写入

-- 生成模拟流数据(每2秒生成一条)
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector ARRAY<FLOAT>,       -- 向量以FLOAT数组形式传入,默认长度3
    event_time AS PROCTIME() -- 事件时间(Flink自动生成)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- 每秒生成100条
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);

CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- 唯一标识(如设备ID)
  vector ARRAY<FLOAT>,     -- 向量数据(必须为FLOAT数组,且数组长度必须和上游一致)
  timestamp BIGINT         -- 时间戳(用于流处理)
  PRIMARY KEY (id) NOT ENFORCED  -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);

-- 转换数据并写入Milvus
INSERT INTO milvus_stream_sink
SELECT 
    id,
    vector,
    UNIX_TIMESTAMP() * 1000 AS timestamp  -- 当前时间戳(毫秒)
FROM mock_source;
  • 向量搜索

CREATE TEMPORARY TABLE milvus_table (
  id STRING,               -- 唯一标识(如设备ID)
  vector ARRAY<FLOAT>,     -- 向量数据(必须为FLOAT数组,且数组长度必须和上游一致)
  PRIMARY KEY (id) NOT ENFORCED  -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);
-- 根据向量[1.1, 2.2, 3.3],寻找 top2 相似的数据。
SELECT * FROM 
LATERAL TABLE(
  VECTOR_SEARCH(
    TABLE milvus_table, 
    DESCRIPTOR(vector), 
    ARRAY[1.1, 2.2, 3.3], 
    2));

注意:在作业启动前请先手动加载该表,具体请参考文档