基于大模型的实时数据分析快速入门

本文为您介绍如何使用百炼平台的模型服务进行数据分析。

背景信息

阿里云百炼大模型服务平台是面向AI开发者与业务团队的一站式大模型开发及应用构建平台,深度融合了Flink实时计算能力,支持通过可视化界面低代码构建智能应用。用户可快速调用百炼大模型能力,结合Flink实时数据处理管道,显著缩短从数据接入到智能决策的链路。以下是两种核心模型的应用场景:

  • chat/completions模型 chat/completions模型是一种基于对话生成和文本理解的大模型,广泛应用于情感分析、意图识别、问答系统等场景。

    • 情感分析:对企业社交媒体评论进行实时情感分类,快速识别用户情绪(如正面、负面、中性)。

    • 智能客服:通过对话生成能力,为用户提供自然语言交互的智能客服服务。

    • 内容审核:自动检测文本中的敏感内容或违规信息,提升内容安全审核效率。

  • embedding模型 embedding模型能够将文本转换为高维向量表示,适用于语义搜索、推荐系统、知识图谱构建等场景。

    • 语义搜索:通过对商品描述或用户查询进行向量化处理,实现基于语义的相关性搜索。

    • 推荐系统:利用文本向量化技术,挖掘用户兴趣与商品特征之间的潜在关联,提升推荐精准度。

    • 知识图谱:将非结构化文本转化为向量形式,便于后续的知识抽取和关系建模。

前提条件

使用限制

仅实时计算引擎VVR 11.1及以上版本支持。

步骤一:注册百炼模型

请参见模型设置注册百炼模型。

chat/completions模型任务

以注册通义千问qwen-turbo模型为例,SQL代码如下:

CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
    'provider'='bailian',
    'endpoint'='https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions',    -- chat/completions模型任务端口
    'apiKey' = '<YOUR KEY>',
    'model'='qwen-turbo',                                                               -- 通义千问qwen-turbo模型
    'systemPrompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label.'
);

embedding模型任务

以注册文本向量text-embedding-v3模型为例,SQL代码如下:

CREATE MODEL embedding_model
INPUT (`input` STRING)
OUTPUT (`embeddings` ARRAY<FLOAT>)
WITH (
    'provider'='bailian',
    'endpoint'='https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings',   -- embeddings模型任务端口
    'apiKey' = '<YOUR KEY>',
    'model'='text-embedding-v3'                                                  -- 文本向量text-embedding-v3模型
);

步骤二:创建作业

请参见Flink SQL作业快速入门创建SQL流作业草稿。

步骤三:编写SQL作业进行AI大模型分析

chat/completions模型任务

使用AI函数ML_PREDICT调用已注册的ai_analyze_sentiment模型,对电影评论进行情感分析。

重要

ML_PREDICT语句相关的Flink算子的吞吐量受到百炼平台限流的限制。当触及百炼平台允许的访问流量上限时,Flink作业会表现出以ML_PREDICT算子为瓶颈的反压现象。在限流情况严重时,可能会触发相关算子的超时报错及作业重启。您可查询百炼平台限流了解不同模型的限流条件,或联系商务申请或PDSA了解如何解除限流。

拷贝如下示例SQLSQL编辑区域。

--创建临时结果表datagen_sink
CREATE TEMPORARY TABLE print_sink(
  id BIGINT,
  movie_name VARCHAR, 
  predicit_label VARCHAR, 
  actual_label VARCHAR
) WITH (
  'connector' = 'print',   -- print连接器
  'logger' = 'true'        -- 控制台显示计算结果
);

-- 创建临时数据视图构造测试数据
-- | id | movie_name | comment   | actual_label |
-- | 1  | 好东西     | 最爱小孩子猜声音那段,算得上看过的电影里相当浪漫的叙事了。很温和也很有爱。| POSITIVE |
-- | 2  | 水饺皇后   | 乏善可陈  | NEGATIVE |
CREATE TEMPORARY VIEW movie_comment(id, movie_name,  user_comment, actual_label)
AS VALUES (1, '好东西', '最爱小孩子猜声音那段,算得上看过的电影里相当浪漫的叙事了。很温和也很有爱。', 'positive'), (2, '水饺皇后', '乏善可陈', 'negative');

INSERT INTO blackhole_sink
SELECT id, movie_name, content as predicit_label, actual_label 
FROM ML_PREDICT(
  TABLE movie_comment, 
  MODEL ai_analyze_sentiment,  -- 注册的通义千问qwen-turbo模型
  DESCRIPTOR(user_comment));   

embedding模型任务

使用AI函数ML_PREDICT调用已注册的embedding_model模型,对电影评论进行情感分析,并将结果写入Milvus(公测中)

重要

ML_PREDICT语句相关的Flink算子的吞吐量受到百炼平台限流的限制。当触及百炼平台允许的访问流量上限时,Flink作业会表现出以ML_PREDICT算子为瓶颈的反压现象。在限流情况严重时,可能会触发相关算子的超时报错及作业重启。您可查询百炼平台限流了解不同模型的限流条件,或联系商务申请或PDSA了解如何解除限流。

拷贝如下示例SQLSQL编辑区域。

-- 创建临时结果表milvus_sink
CREATE TEMPORARY TABLE milvus_sink
(
    id STRING,
    movie_name STRING,
    user_comment STRING,
    embeddings ARRAY<FLOAT>,
    PRIMARY KEY (id) NOT ENFORCED
)
WITH (
    'connector' = 'milvus',
    'endpoint' = '<YOUR-ENDPOINT>',
    'port' = '<YOUR-PORT>',
    'userName' = '<YOUR-USERNAME>',
    'password' = '<YOUR-PASSWORD>',
    'databaseName' = 'default',
    'collectionName' = 'movie-comment-embeddings'
);

-- 创建临时数据视图构造测试数据
-- | id | movie_name | comment | actual_label|
-- | 1 | 好东西 |最爱小孩子猜声音那段,算得上看过的电影里相当浪漫的叙事了。很温和也很有爱。| POSITIVE |
-- | 2 | 水饺皇后 | 乏善可陈 | NEGATIVE |
CREATE TEMPORARY VIEW movie_comment(id, movie_name,  user_comment)
AS VALUES ('1', '好东西', '最爱小孩子猜声音那段,算得上看过的电影里相当浪漫的叙事了。很温和也很有爱。'), ('2', '水饺皇后', '乏善可陈');


INSERT INTO
    milvus_sink
SELECT
    id,
    movie_name,
    user_comment,
    embeddings
FROM
    ML_PREDICT (
        TABLE movie_comment,
        MODEL embedding_model,  -- 注册的文本向量text-embedding-v3模型                   
        DESCRIPTOR (user_comment)
    );

步骤四:作业部署并启动

请参见Flink SQL作业快速入门部署作业并启动。

步骤五:查看分析结果

chat/completions模型任务

  1. 查看目标作业状态已完成

    image

  2. 运维中心 > 作业运维页面,单击目标作业名称。

  3. 作业日志页签,选择Task Managers页签下的当前TaskManager

  4. 单击日志,在页面搜索PrintSinkOutputWriter相关的日志信息。

    经模型分析后的结果predicit_label和实际结果actual_label一致。

    1

embedding模型任务

分析结果已写入Milvus目标结果集中,需登录Milvus查看。

  1. 访问Attu页面

  2. 进入目标集合,在数据页签中查看同步的数据。

    2

相关文档