特征平台LLM文本Embedding方案流程最佳实践

更新时间:
复制为 MD 格式

在机器学习特征工程中,文本字段无法直接用于模型训练,需要先通过大模型提取Embedding向量后再进行使用。例如商品评价文本、用户评论内容等,通过文本Embedding可以将其转换为固定维度的向量表示,用于相似度计算、语义检索和推荐召回等场景。本文档介绍如何通过特征平台实现文本Embedding的离线和实时提取,以及后续的模型训练和上线全流程。

LLM文本Embedding模型

本文档介绍的功能支持任意阿里云和自托管的 LLM Embedding 模型调用。20256月阿里云发布了Qwen3 Embedding 系列模型,该系列模型专为文本表征、检索与排序任务设计,基于Qwen3基础模型进行训练,充分继承了Qwen3在多语言文本理解能力方面的优势。在多项基准测试中,Qwen3 Embedding系列在文本表征和排序任务中展现了卓越的性能。

文本Embedding的核心作用是将非结构化的文本数据转换为结构化的向量表示。例如在电商推荐场景中,商品的用户评价文本"Good Quality Dog Food"可以通过Embedding转换为1024维的向量,与其他商品的评价向量进行相似度计算,实现基于语义的推荐匹配。本文档以Qwen3 Embedding系列模型为例,介绍在特征平台中完成文本特征提取、训练和上线的完整过程。

前提条件

准备数据表

本文以商品评价文本为例进行说明。在实际电商场景中,用户对商品的评价文本蕴含了丰富的信息,如产品质量、使用体验等,这些文本数据可以作为重要的特征用于推荐模型训练。请将下表复制到DataWorks中执行。

其中,假设物品表的特征字段review_text为文本字段,需要对review_text字段进行实时处理提取Embedding,对应的MaxComputeschema及示例数据为:

create table if not exists item_fea_review_text_v1(
    item_id bigint COMMENT 'item_id'
    ,review_text string COMMENT 'review_text'
)
COMMENT 'item_fea_review_text_v1'
PARTITIONED BY (
    ds string COMMENT '业务日期'
)
LIFECYCLE 365
;

INSERT INTO TABLE item_fea_review_text_v1 PARTITION (ds='${bdp.system.bizdate}')  -- 如果是分区表,默认插入最新分区
VALUES (901, 'Good Quality Dog Food,I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.');  -- 根据实际字段类型调整值

select *
from item_fea_review_text_v1 where ds = '${bdp.system.bizdate}';

注册特征平台

注册大模型配置信息

为了后续特征提取时能够直接调用大模型进行文本Embedding计算,需要先将模型配置信息注册到特征平台。注册后,特征平台会自动管理模型的调用和向量生成过程。本文以注册千问text-embedding-v4为例进行说明。

  1. 登录PAI控制台,在左侧导航栏单击数据准备

  2. 选择工作空间后,单击进入FeatureStore

  3. 特征生产页签,单击新建大模型调用信息,配置以下关键参数,其他未说明的参数保持默认即可。

    参数

    说明

    名称

    自定义名称。

    模型参数

    模型类型勾选 TEXT_EMBEDDING,确认base urlhttps://dashscope.aliyuncs.com/compatible-mode/v1,勾选/输入模型名称 text-embedding-v4后,默认向量维度1024,并发数30,最大输入token限制填入8192,BatchSize10 (10是建议的最大并行度),您也可以根据自己的业务场景修改。

    参数详情,请参见同步接口API详情

    其中text-embedding-v4模型属于Qwen3-Embedding系列,默认产出1024维的向量。

    API Key

    填写已获取的百炼API Key。

  4. 单击确定

注册特征视图

特征视图是特征平台的核心概念,定义了特征的结构、来源和计算方式。在创建特征视图之前,需要先完成以下准备工作:创建FeatureStore项目,并创建特征实体(item)。特征实体item代表商品维度,后续的文本Embedding提取和向量生成都围绕该实体展开。

  1. 特征视图页签,单击新建特征视图,配置以下关键参数,其他未说明的参数保持默认即可。

    参数

    说明

    视图名称

    自定义名称(例:item_fea_embedding_debug_v1)。

    类型

    选择实时

    特征实体

    选择item

    特征字段

    表格添加页签,新增如下两个字段:

    • item_idINT64,勾选主键

    • review_text_embeddingARRAY<FLOAT>,单击特征生产,配置如下参数:

      • 变换类型:选择LLMEmbedding

      • 大模型调用信息:选择已注册的大模型配置名称。

      • 特征生产输入:字段名输入review_text,多模态类型勾选TEXT。字段名需填写原始字段名,在离线场景中对应MaxCompute表的特征生产输入字段,在在线场景中对应Flink SQLSDK等特征生产输入字段。

  2. 单击提交

  3. 单击已创建的特征视图名称,可查看该特征视图schemaJSON格式。

同步数据

实时同步

实时同步适用于需要持续更新特征的场景。例如在新商品上架或用户评价更新时,需要及时将最新的文本数据转换为Embedding向量写入在线数据源。实时同步的核心流程如下:

  1. 创建DataHub Topic。

    1. 登录DataHub控制台新建项目

    2. 单击已创建的项目名称,进入项目详情页面。

    3. 单击新建Topic,配置以下关键参数,其他未说明的参数保持默认即可。

      参数

      说明

      名称

      自定义名称。

      Schema详情

      新增如下两个字段:

      • item_idBIGINT,取消勾选允许为null

      • review_textSTRING,勾选允许为null

    4. 单击创建

    5. 单击已创建的Topic,进入Topic详情页面。

    6. 订阅列表页签,单击订阅,填写相关信息后,单击创建

  2. 使用Flink SQL写入FeatureStore Connector。

    Flink SQL主要包含三部分:创建临时表关联DataHub,用于接收实时数据;创建临时表关联特征平台中对应的特征视图,用于写入处理后的特征;执行INSERT语句将数据从源表同步到目标表。该Flink任务部署启动完成后,会例行从DataHub中读取实时数据写入到特征平台对应的特征视图,特征平台会自动调用已注册的大模型提取当前数据的Embedding向量,结果落入review_text_embedding字段。Flink SQL示例如下:

    CREATE TEMPORARY TABLE item_fea_embedding_debug_v1_dh
    (
        item_id      bigint
        ,review_text string
    )
    WITH (
        'connector' = 'datahub'
        ,'subId' = 'xxxxxxxxxx'
        ,'endPoint' = 'http://dh-cn-xxxxxx.aliyuncs.com' --需替换为实际的DataHub endpiont地址
        ,'project' = 'fs_test' --需替换为实际的DataHub项目名称
        ,'topic' = 'feature_store_llm_embedding_test_v1' --需替换为实际的DataHub Topic名称
        ,'accessId' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
        ,'accessKey' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
    )
    ;
    
    CREATE TEMPORARY TABLE item_fea_embedding_debug_v1
    (
        item_id          bigint
        ,review_text         string
    )
    WITH (
        'connector' = 'featurestore'
        ,'region_id' = 'cn-beijing' -- 需替换为FeatureStore实际region
        ,'project' = 'fs_demo_featuredb' -- 需替换为FeatureStore实际project名称
        ,'feature_view' = 'item_fea_embedding_debug_v1' -- 需替换为FeatureStore实际FeatureView名称
        ,'username' = '${secret_values.FEATUREDB_USERNAME}'  
        ,'password' = '${secret_values.FEATUREDB_PASSWORD}'  
        ,'aliyun_access_id' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
        ,'aliyun_access_key' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
    )
    ;
    
    INSERT INTO item_fea_embedding_debug_v1
    SELECT
        item_id
        ,review_text
    FROM item_fea_embedding_debug_v1_dh
    ;

    为保障您的信息安全,部署启动该SQL作业前,需要在变量管理中创建以下名称变量并赋值:

    ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET以及FEATUREDB_USERNAME、FEATUREDB_PASSWORD

  3. 读取在线数据源特征。

    可以通过使用FeatureStore SDK读取在线数据源的特征,验证在线写入的特征流程是否正常运行。

离线同步

离线同步适用于批量特征更新场景。例如首次上线时,需要将历史存量的商品评价文本批量转换为Embedding向量;或者在每日例行更新时,将新增的评价文本的Embedding同步到在线数据源。离线同步分为全部离线同步增量离线同步两种方式。

  1. 全部离线同步:首次上线时,可以将整个离线表经过特征生产LLM Embedding提取后,同步到线上数据源中。

    将如下代码复制到DataWorks中的PyODPS 3节点中执行。(实际执行时请参考:方式一:使用新版独享资源组+内置镜像(推荐)文档配置并运行)

    from feature_store_py.fs_client import FeatureStoreClient
    import datetime
    from feature_store_py.fs_datasource import MaxComputeDataSource
    import sys
    from odps.accounts import StsAccount
    
    cur_day = args['dt']
    print('cur_day = ', cur_day)
    
    access_key_id = o.account.access_id
    access_key_secret = o.account.secret_access_key
    sts_token = None
    endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com' #需替换实际的endpoint地址
    if isinstance(o.account, StsAccount):
        sts_token = o.account.sts_token
    fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
    cur_project_name = 'fs_demo_featuredb' #需替换实际的Featurestore项目名称
    project = fs.get_project(cur_project_name)
    
    feature_view_name = 'item_fea_embedding_debug_v1' #需替换实际的Featurestore特征视图名称
    batch_feature_view = project.get_feature_view(feature_view_name)
    task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_v1'})
    task.wait()
  2. 增量离线同步:首次上线完成后,后续可以只同步增量数据,避免重复处理已有商品的评价文本。增量同步的核心思路是先计算出新增的item_id集合,然后将这些增量商品的文本Embedding同步到线上数据源。

    1. 计算新的item_id集合。通过SQL查询识别出当日新增的商品数据:

      CREATE TABLE IF NOT EXISTS item_fea_review_text_new_tmp_v1 (
        item_id BIGINT COMMENT '商品ID',
        review_text STRING COMMENT '英文描述'
      )
      PARTITIONED BY (
          ds string COMMENT '业务日期'
      )
      LIFECYCLE 365
      ;
      
      -- 插入新数据
      INSERT INTO TABLE item_fea_review_text_new_tmp_v1 PARTITION (ds='${bdp.system.bizdate}')
      SELECT 
          t1.item_id,
          t1.review_text
      FROM 
          item_fea_review_text_v1 t1
      WHERE 
          t1.ds = '${bdp.system.bizdate}'
          AND NOT EXISTS (
            SELECT 1 
            FROM item_fea_review_text_v1 t2 
            WHERE t2.ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'), -1, 'dd'),'yyyymmdd')
            AND t1.item_id = t2.item_id
          );
    2. 将新的item_id集合经过特征生产同步到在线数据源中。

      将如下代码复制到DataWorks中的PyODPS 3节点中执行。

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      from odps.accounts import StsAccount
      
      cur_day = args['dt'] #dt的值为表中实际的分区值
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      sts_token = None
      endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
      if isinstance(o.account, StsAccount):
          sts_token = o.account.sts_token
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
      cur_project_name = 'fs_demo_featuredb'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'item_fea_embedding_debug_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_new_tmp_v1'})
      task.wait()

直接同步已提取好的Embedding

在某些场景中,您可能已经在其他系统中完成了文本Embedding的提取工作,不希望通过特征平台重复调用模型进行提取。此时可以直接将已有的Embedding结果同步至线上数据源。这种方式可以节省模型调用成本,同时加快特征上线速度。使用已有提取好的Embedding直接同步时,表中需要包含review_text_embedding字段。

假设已提取的Embedding表如下所示:

create table if not exists item_fea_review_text_embedding_debug_v1(
    item_id bigint COMMENT 'item_id'
    ,review_text_embedding array<float> COMMENT 'review_text_embedding'
)
COMMENT 'item_fea_review_text_embedding_debug_v1'
PARTITIONED BY (
    ds string COMMENT '业务日期'
)
LIFECYCLE 365
;

INSERT INTO TABLE item_fea_review_text_embedding_debug_v1 PARTITION (ds='20250509')  -- 如果是分区表,默认插入最新分区
VALUES (902, cast(array(1,2,3,4,5,6,7.0,8.1,9.0) as array<float>));  -- 根据实际字段类型调整值

select *
from item_fea_review_text_embedding_debug_v1 where ds = '20250509';

将如下代码复制到DataWorks中的PyODPS 3节点中执行。

from feature_store_py.fs_client import FeatureStoreClient
import datetime
from feature_store_py.fs_datasource import MaxComputeDataSource
import sys
from odps.accounts import StsAccount

cur_day = args['dt']
print('cur_day = ', cur_day)

access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
sts_token = None
endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
if isinstance(o.account, StsAccount):
    sts_token = o.account.sts_token
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
cur_project_name = 'fs_demo_featuredb'
project = fs.get_project(cur_project_name)

feature_view_name = 'item_fea_embedding_debug_v1'
batch_feature_view = project.get_feature_view(feature_view_name)
task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_embedding_debug_v1', 'transform': False})
task.wait()
task.print_summary()

导出及训练

新建模型特征

模型特征定义了训练模型所使用的特征集合。具体操作,请参见配置FeatureStore项目

导出样本

导出训练数据集,用于后续模型训练。具体操作,请参见导出Training Set并训练模型

配置Feature Generator及训练

利用TorchEasyReccreate_fg_json脚本,根据TorchEasyRec Config产出fg.json,用于在线推理时的特征处理。

具体操作,请参见配置Feature Generator及训练。其中,Embedding型特征TorchEasyRec Config配置,请参见TorchEasyRec特征

上线

部署EAS服务

将训练好的模型部署为EAS(Elastic Algorithm Service)在线推理服务。具体操作,请参见创建与部署EAS模型服务

配置PAI-Rec引擎

配置PAI-Rec推荐引擎,使其能够从FeatureDB获取实时特征并调用EAS服务进行推荐推理。具体操作,请参见配置PAI-REC