通过Flink实现实时数据分析

更新时间:

本文为您介绍如何通过Flink实现实时数据分析,并将处理结果写入Milvus向量数据库。该教程适用于需要结合Flink实时计算与Milvus向量检索的场景,特别适合处理高并发的流式数据。

前提条件

使用限制

仅实时计算引擎VVR 11.1及以上版本支持。

步骤一:创建作业

  1. 进入SQL作业创建页面。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL

  2. 单击image后,单击新建流作业,填写文件名称并选择引擎版本

    Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板数据同步模板

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    flink-test

    引擎版本

    当前作业使用的Flink引擎版本。

    建议使用带有推荐稳定标签的版本,这些版本具有更高的可靠性和性能表现,引擎版本详情请参见功能发布记录引擎版本介绍

    vvr-8.0.8-flink-1.17

  3. 单击创建

步骤二:编写SQL作业

拷贝如下示例到SQL编辑区域。

-- 生成模拟流数据(每2秒生成一条)
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector array<float>,           -- 向量以字符串形式传入
    event_time AS PROCTIME() -- 事件时间(Flink自动生成)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- 每秒生成1条
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);

-- 创建临时结果表milvus_sink
CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- 唯一标识(如设备ID)
  vector ARRAY<FLOAT>,     -- 向量数据(必须为FLOAT数组)
  `timestamp` BIGINT,         -- 时间戳(用于流处理)
  PRIMARY KEY (id) NOT ENFORCED  -- 必须,Milvus只支持BIGINT或STRING类型作为主键
) WITH (
    'connector' = 'milvus',
    'endpoint' = '<YOUR-ENDPOINT>',
    'port' = '<YOUR-PORT>',
    'userName' = '<YOUR-USERNAME>',
    'password' = '<YOUR-PASSWORD>',
    'databaseName' = 'default',
    'collectionName'='flink_stream_demo'
);

-- 转换数据并写入Milvus
INSERT INTO milvus_sink
SELECT 
    id,  -- 将字符串向量转换为FLOAT数组(示例转换逻辑)
    vector,
    UNIX_TIMESTAMP() * 1000 AS `timestamp`  -- 当前时间戳(毫秒)
FROM mock_source;

本文示例涉及以下参数,请您根据实际环境替换。

参数

说明

connector

连接器名称。此处固定为milvus

endpoint

Milvus的访问地址,填写格式为http://内网地址

Milvus实例的内网地址,您可以在Milvus实例的实例详情页面查看。

port

Milvus的监听端口,固定为19530

user_name

创建Milvus实例时,您自定义的用户名和密码。默认用户名为root。

password

databaseName

待连接的数据库名称,本文示例为默认的default

collection_name

Collection的名称。您可以自定义。本文以flink_stream_demo为例。

步骤三:作业部署并启动

  1. SQL编辑区域右上方,单击部署

  2. 部署新版本对话框,可根据需要填写或选中相关内容,单击确定

    在部署作业时部署目标可以选择资源队列或者Session集群

  3. 在左侧导航栏,单击运维中心 > 作业运维

  4. 单击目标作业名称操作列中的启动

  5. 选择无状态启动后,单击启动。

    当您看到作业状态变为运行中,则代表作业运行正常。作业启动参数配置,详情请参见作业启动

    更多参数信息,请参见Flink SQL作业快速入门部署作业并启动。

步骤四:查看同步结果

分析结果已写入Milvus目标结果集中,需登录Milvus查看。

  1. Milvus控制台,访问目标Milvus实例的Attu页面,详情请参见访问Attu页面

  2. 进入目标集合中,在数据页签中查看同步的数据。

    image