通过Flink实现实时数据分析
本文为您介绍如何通过Flink实现实时数据分析,并将处理结果写入Milvus向量数据库。该教程适用于需要结合Flink实时计算与Milvus向量检索的场景,特别适合处理高并发的流式数据。
前提条件
已开通Flink工作空间,详情请参见开通实时计算Flink版。
已创建Milvus实例和Collection,详情请参见快速创建Milvus实例和管理Collections。
使用限制
仅实时计算引擎VVR 11.1及以上版本支持。
步骤一:创建作业
步骤二:编写SQL作业
拷贝如下示例到SQL编辑区域。
-- 生成模拟流数据(每2秒生成一条)
CREATE TEMPORARY TABLE mock_source (
id STRING,
vector array<float>, -- 向量以字符串形式传入
event_time AS PROCTIME() -- 事件时间(Flink自动生成)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒生成1条
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
-- 创建临时结果表milvus_sink
CREATE TEMPORARY TABLE milvus_sink (
id STRING, -- 唯一标识(如设备ID)
vector ARRAY<FLOAT>, -- 向量数据(必须为FLOAT数组)
`timestamp` BIGINT, -- 时间戳(用于流处理)
PRIMARY KEY (id) NOT ENFORCED -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
'connector' = 'milvus',
'endpoint' = '<YOUR-ENDPOINT>',
'port' = '<YOUR-PORT>',
'userName' = '<YOUR-USERNAME>',
'password' = '<YOUR-PASSWORD>',
'databaseName' = 'default',
'collectionName'='flink_stream_demo'
);
-- 转换数据并写入Milvus
INSERT INTO milvus_sink
SELECT
id, -- 将字符串向量转换为FLOAT数组(示例转换逻辑)
vector,
UNIX_TIMESTAMP() * 1000 AS `timestamp` -- 当前时间戳(毫秒)
FROM mock_source;
本文示例涉及以下参数,请您根据实际环境替换。
参数 | 说明 |
| 连接器名称。此处固定为 |
| Milvus的访问地址,填写格式为 Milvus实例的内网地址,您可以在Milvus实例的实例详情页面查看。 |
| Milvus的监听端口,固定为 |
| 创建Milvus实例时,您自定义的用户名和密码。默认用户名为root。 |
| |
| 待连接的数据库名称,本文示例为默认的 |
| Collection的名称。您可以自定义。本文以flink_stream_demo为例。 |
步骤三:作业部署并启动
在SQL编辑区域右上方,单击部署。
在部署新版本对话框,可根据需要填写或选中相关内容,单击确定。
在部署作业时部署目标可以选择资源队列或者Session集群。
在左侧导航栏,单击
。单击目标作业名称操作列中的启动。
选择无状态启动后,单击启动。
当您看到作业状态变为运行中,则代表作业运行正常。作业启动参数配置,详情请参见作业启动。
更多参数信息,请参见Flink SQL作业快速入门部署作业并启动。
步骤四:查看同步结果
分析结果已写入Milvus目标结果集中,需登录Milvus查看。
在Milvus控制台,访问目标Milvus实例的Attu页面,详情请参见访问Attu页面。
进入目标集合中,在数据页签中查看同步的数据。