Milvus

本文为您介绍如何使用Milvus连接器。

背景信息

Milvus是一个高度可扩展的向量数据库,专为处理大规模非结构化数据(如图像、文本和音频)设计。支持高效的相似性搜索,适用于推荐系统、图像检索、语义搜索等场景。Milvus连接器支持的信息如下:

类别

详情

支持类型

结果表

运行模式

流模式

数据格式

特有监控指标

API种类

SQL

是否支持更新/删除

特色功能

Milvus连接器深度集成Apache FlinkMilvus向量数据库,为实时向量检索场景提供高性能、高可靠的数据管道。以下为Milvus连接器核心功能特性说明:

  • 高并发写入:支持自定义结果表并行度;

  • 自动重试机制:失败后可自动重试,提升稳定性;

  • 批量缓冲写入:通过设置攒批写入升写入性能。

  • At-Least-Once 语义:主持主键幂等更新,实现数据最终一致性;

前提条件

  • 已创建Milvus集群,详情请参见快速创建Milvus实例

  • Milvus集合结构需提前创建。如使用分区写入,请确保Milvus分区已存在。

使用限制

  • 仅实时计算引擎VVR 11.1及以上版本支持。

  • 仅支持2.4.x版本Milvus。

  • Milvus连接器仅支持At Least Once语义。

语法结构

CREATE TEMPORARY TABLE milvus_sink (
  id BIGINT,
  f1 STRING,
  f2 BOOLEAN,
  f3 TINYINT,
  f4 SMALLINT,
  f5 INTEGER,
  f6 DATE,
  f7 TIME(3),
  f8 TIMESTAMP_LTZ(3),
  f9 TIMESTAMP(3),
  f10 FLOAT,
  f11 DOUBLE,
  f12 DECIMAL(10, 2),
  f13 ARRAY<FLOAT>,
  f14 ARRAY<DOUBLE>,
  f15 ARRAY<INTEGER>,
  f16 ARRAY<BIGINT>,
  PRIMARY KEY (id) NOT ENFORCED  -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
  'connector'='milvus',
  'endpoint'='<yourEndpoint>',
  'port'='<yourPort>',
  'userName'='<yourUserName>',
  'password'='<yourPassword>',
  'databaseName'='<yourDatabaseName>',
  'collectionName'='<yourCollectionName>'
);

WITH参数

通用参数

参数

说明

数据类型

是否必填

默认值

备注

connector

连接器名称。

String

指定使用的连接器名称,必须为milvus

endPoint

Milvus数据库的访问地址(IP或域名)

String

详情请参见网络访问与安全设置

port

Milvus数据库服务的端口号。

INTEGER

19530

username

Milvus数据库服务的用户名。

STRING

无。

password

Milvus数据库服务的密码。

STRING

databaseName

Milvus数据库名称。

STRING

collectionName

Milvus集合(Collection)名称。

STRING

partitionName

写入的分区名称。

STRING

_default

partitionKey.enabled

集合是否启用了标量字段作为Partition Key。

BOOLEAN

false

结果表独有参数

参数

说明

数据类型

是否必填

默认值

备注

sink.parallelism

自定义Sink的并行度。

INTEGER

上游算子的并发,由系统框架决定。

sink.maxRetries

写入失败时的最大重试次数。

INTEGER

3

无。

sink.buffer-flush.max-rows

缓存记录的最大数量(包括append、upsertdelete),超过该值将刷新数据到Milvus;

INTEGER

10000

设定为0表示禁用。

sink.buffer-flush.interval

缓存记录的刷新间隔时间,超过该时间将刷新数据到Milvus。

INTEGER

1000

单位:毫秒(ms),设定为0表示禁用。

sink.ignoreDelete

是否忽略删除操作。

BOOLEAN

false

参数取值如下:

  • true:忽略Delete操作。

  • false:不忽略Delete操作。

类型映射

FlinkSQL类型

Milvus类型

STRING

VarChar(n)

BOOLEAN

Bool

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

DATE

VarChar(n)

TIME(3)

VarChar(n)

TIMESTAMP_LTZ(3)

Int64

说明

Epoch毫秒值。

TIMESTAMP(3)

VarChar(n)

FLOAT

Float

DOUBLE

Double

DECIMAL(10, 2)

VarChar(n)

ARRAY<FLOAT>

FloatVector

说明

在创建Milvus Collection之后,需要为向量字段创建索引。

ARRAY<DOUBLE>

Array<Double>[m]

ARRAY<INTEGER>

Array<Int32>[m]

ARRAY<BIGINT>

Array<Int64>[m]

使用示例

生成模拟流数据写入Milvus。

-- 生成模拟流数据(每2秒生成一条)
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector ARRAY<FLOAT>,       -- 向量以FLOAT数组形式传入,默认长度3
    event_time AS PROCTIME() -- 事件时间(Flink自动生成)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- 每秒生成100条
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);

CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- 唯一标识(如设备ID)
  vector ARRAY<FLOAT>,     -- 向量数据(必须为FLOAT数组,且数组长度必须和上游一致)
  timestamp BIGINT         -- 时间戳(用于流处理)
  PRIMARY KEY (id) NOT ENFORCED  -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);

-- 转换数据并写入Milvus
INSERT INTO milvus_stream_sink
SELECT 
    id,
    vector,
    UNIX_TIMESTAMP() * 1000 AS timestamp  -- 当前时间戳(毫秒)
FROM mock_source;