Real-time data analysis with Flink

更新时间:
复制 MD 格式

This topic describes how to analyze real-time data with Flink and write the processed results to a Milvus vector database. This tutorial is designed for scenarios that combine Flink real-time computing with Milvus vector retrieval, particularly for processing high-concurrency streaming data.

Prerequisites

Limitations

This feature is supported only in Ververica Runtime (VVR) 11.1 and later.

Step 1: Create a job

    1. Log on to the Realtime Compute console.

    2. Find the target workspace and click Console in the Actions column.

    3. In the left-side navigation pane, click Development > ETL.

  1. Click the image icon, and then click New Blank Stream Draft. Enter a File Name and select an engine version.

    Realtime Compute for Apache Flink provides a variety of code templates and data synchronization templates. Each template includes specific use cases, code examples, and instructions. You can use these templates to quickly familiarize yourself with product features and syntax to implement your business logic. For more information, see Code templates and Data synchronization templates.

    image

    Parameter

    Description

    Example

    File name

    The name of the stream draft.

    Note

    The name must be unique within the current workspace.

    flink-test

    Engine version

    The Flink engine version for the stream draft.

    Use versions labeled with Recommended or Stable. These versions offer higher reliability and better performance. For more information about engine versions, see Feature release history and Engine versions.

    vvr-8.0.8-flink-1.17

  2. Click Create.

Step 2: Write an SQL job

Copy the following sample code to the SQL editor.

-- Generate mock stream data.
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector array<float>,           -- Vector data.
    event_time AS PROCTIME() -- Event time, automatically generated by Flink.
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- Generate 100 records per second.
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);
-- Create a temporary result table named milvus_sink.
CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- A unique identifier, such as a device ID.
  vector ARRAY<FLOAT>,     -- Vector data (must be a FLOAT array).
  `timestamp` BIGINT,         -- Timestamp for stream processing.
  PRIMARY KEY (id) NOT ENFORCED  -- Required. Milvus supports only BIGINT or STRING as the primary key.
) WITH (
    'connector' = 'milvus',
    'endpoint' = '<YOUR-ENDPOINT>',
    'port' = '<YOUR-PORT>',
    'userName' = '<YOUR-USERNAME>',
    'password' = '<YOUR-PASSWORD>',
    'databaseName' = 'default',
    'collectionName'='flink_stream_demo'
);
-- Transform the data and write it to Milvus.
INSERT INTO milvus_sink
SELECT 
    id,
    vector,
    UNIX_TIMESTAMP() * 1000 AS `timestamp`  -- The current timestamp in milliseconds.
FROM mock_source;

This example uses the parameters listed below. Replace the placeholder values with your environment's actual values.

Parameter

Description

connector

The name of the connector. This value must be milvus.

endpoint

The access address for Milvus must be in the format http://internal endpoint.

The internal endpoint of your Milvus instance can be found on the Instance Details page.

port

The listening port of the Milvus instance. This value must be 19530.

userName

The username and password that you configured when you created the Milvus instance. The default username is root.

password

databaseName

The name of the database to connect to. This example uses the default database default.

collectionName

The name of the collection. You can customize it. This example uses flink_stream_demo.

Step 3: Deploy and start the job

  1. In the upper-right corner of the SQL editor, click Deploy.

  2. In the Deploy New Version dialog box, configure the parameters and click OK.

    For the Deployment target, you can select Resource queue or Session cluster.

  3. In the left-side navigation pane, click O&M Center > Job Operations.

  4. In the Actions column for the target job, click Start.

  5. Select Stateless start and click Start.

    A running status indicates that the job has started successfully. For more information about job startup parameters, see Start a job.

    For more information, see Deploy and start a Flink SQL job.

Step 4: View the synchronized results

The analysis results are written to the target collection in Milvus. Log on to Milvus to view the data.

  1. In the Milvus console, access the Attu page of the target Milvus instance. For more information, see Access the Attu page.

  2. Navigate to the target collection and view the synchronized data on the Data tab.

    After the synchronization is complete, the flink_stream_demo collection contains 1,000 records with three fields: id, vector, and timestamp.