机器学习特征中常包含无法直接用于模型训练的文本字段,常用方法是通过大模型提取Embedding后再进行训练。本文档介绍在特征平台中如何通过特征平台实现离线和实时Embedding提取,模型训练后并上线的全过程。
LLM Embedding 模型
本文档介绍的功能支持任意阿里云和自托管的 LLM Embedding 模型调用。2025年6月阿里云发布了Qwen3 Embedding 系列模型,该系列模型专为文本表征、检索与排序任务设计,基于Qwen3基础模型进行训练,充分继承了Qwen3在多语言文本理解能力方面的优势。在多项基准测试中,Qwen3 Embedding系列在文本表征和排序任务中展现了卓越的性能。本文档以Qwen3 Embedding系列模型为例,介绍在特征平台中机器学习特征提取后训练上线的完整过程。
前提条件
- 说明
目前特征平台LLM Embedding仅支持使用在线数据源FeatureDB。
准备数据表
本文以下表数据为例进行说明,请将下表复制到DataWorks中执行。
其中,假设物品表的特征字段review_text
为文本字段,需要对review_text
字段进行实时处理提取Embedding,对应的MaxCompute表schema及示例数据为:
create table if not exists item_fea_review_text_v1(
item_id bigint COMMENT 'item_id'
,review_text string COMMENT 'review_text'
)
COMMENT 'item_fea_review_text_v1'
PARTITIONED BY (
ds string COMMENT '业务日期'
)
LIFECYCLE 365
;
INSERT INTO TABLE item_fea_review_text_v1 PARTITION (ds='20250509') -- 如果是分区表,默认插入最新分区
VALUES (901, 'Good Quality Dog Food,I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than most.'); -- 根据实际字段类型调整值
select *
from item_fea_review_text_v1 where ds = '20250509';
注册特征平台
注册大模型配置信息
将要调用的大模型配置信息提前注册,方便后续对特征进行提取Embedding时直接调用。本文以注册通义千问text-embedding-v4为例进行说明。
登录PAI控制台,在左侧导航栏单击数据准备 > 特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
在特征生产页签,单击新建大模型调用信息,配置以下关键参数,其他未说明的参数保持默认即可。
参数
说明
名称
自定义名称。
模型名称
输入text-embedding-v4后,并发数填入30,最大输入token限制填入8192,BatchSize填入10 (10是建议的最大并行度)。
参数详情,请参见同步接口API详情。
其中text-embedding-v4模型属于Qwen3-Embedding系列,默认产出1024维的向量。
API Key
填写已获取的百炼API Key。
单击确定。
注册特征视图
在特征视图页签,单击新建特征视图,配置以下关键参数,其他未说明的参数保持默认即可。
参数
说明
视图名称
自定义名称。
类型
选择实时。
特征实体
选择item。
特征字段
在表格添加页签,新增如下两个字段:
item_id,INT64,勾选主键。
review_text,ARRAY<FLOAT>,单击特征生产,配置如下参数:
变换类型:选择LLMEmbedding。
大模型调用信息:选择已注册的大模型配置信息。
输入字段名:输入review_text。输入字段名需填写原始字段名,在离线场景中对应MaxCompute表的特征生产输入字段,在在线场景中对应Flink SQL或SDK等特征生产输入字段。
单击提交。
单击已创建的特征视图名称,可查看该特征视图schema的JSON格式。
同步数据
实时同步
创建DataHub Topic。
登录DataHub控制台,新建项目。
单击已创建的项目名称,进入项目详情页面。
单击新建Topic,配置以下关键参数,其他未说明的参数保持默认即可。
参数
说明
名称
自定义名称。
Schema详情
新增如下两个字段:
item_id,BIGINT,取消勾选允许为null。
review_text,STRING,勾选允许为null。
单击创建。
单击已创建的Topic,进入Topic详情页面。
在订阅列表页签,单击订阅,填写相关信息后,单击创建。
使用Flink SQL写入FeatureStore Connector。
Flink SQL主要包含三部分:创建临时表关联DataHub;创建临时表关联特征平台中对应的特征视图;从DataHub中读取数据写入到特征平台对应的FeatureStore Connector中。Flink SQL示例如下:
CREATE TEMPORARY TABLE item_fea_embedding_debug_v1_dh ( item_id bigint ,review_text string ) WITH ( 'connector' = 'datahub' ,'subId' = '1747297545688WFPX0' ,'endPoint' = 'http://dh-cn-beijing.aliyuncs.com' ,'project' = 'fs_test' ,'topic' = 'feature_store_llm_embedding_test_v1' ,'accessId' = 'xxx' ,'accessKey' = 'xxx' ) ; CREATE TEMPORARY TABLE item_fea_embedding_debug_v1 ( item_id bigint ,review_text string ) WITH ( 'connector' = 'featurestore' ,'region_id' = 'cn-beijing' ,'project' = 'fs_demo_featuredb' ,'feature_view' = 'item_fea_embedding_debug_v1' ,'username' = 'xxx' -- 需替换为FeatureDB用户名 ,'password' = 'xxx' -- 需替换为FeatureDB密码 ,'aliyun_access_id' = 'xxx' ,'aliyun_access_key' = 'xxx' ) ; INSERT INTO item_fea_embedding_debug_v1 SELECT item_id ,review_text FROM item_fea_embedding_debug_v1_dh ;
读取在线数据源特征。
可以通过使用FeatureStore SDK读取在线数据源的特征,验证在线写入的特征流程是否正常运行。
离线同步
全部离线同步。
首次上线时,可以将整个离线表经过特征生产LLM Embedding提取后,同步到线上数据源中。
将如下代码复制到DataWorks中的PyODPS 3节点中执行。
from feature_store_py.fs_client import FeatureStoreClient import datetime from feature_store_py.fs_datasource import MaxComputeDataSource import sys from odps.accounts import StsAccount cur_day = args['dt'] print('cur_day = ', cur_day) access_key_id = o.account.access_id access_key_secret = o.account.secret_access_key sts_token = None endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com' if isinstance(o.account, StsAccount): sts_token = o.account.sts_token fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint) cur_project_name = 'fs_demo_featuredb' project = fs.get_project(cur_project_name) feature_view_name = 'item_fea_embedding_debug_v1' batch_feature_view = project.get_feature_view(feature_view_name) task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_v1'}) task.wait()
增量离线同步。
首次上线完成后,后续可以只同步增量数据。因此需要先计算出增量的特征表,然后将增量的特征表同步到线上。
计算新的item_id集合。
CREATE TABLE IF NOT EXISTS item_fea_review_text_new_tmp_v1 ( item_id STRING COMMENT '商品ID', review_text STRING COMMENT '英文描述' ) PARTITIONED BY ( ds string COMMENT '业务日期' ) LIFECYCLE 365 ; -- 插入新数据 INSERT INTO TABLE item_fea_review_text_new_tmp_v1 PARTITION (ds='${bdp.system.bizdate}') SELECT t1.item_id, t1.review_text FROM item_fea_review_text_v1 t1 WHERE t1.ds = '${bdp.system.bizdate}' AND NOT EXISTS ( SELECT 1 FROM item_fea_review_text_v1 t2 WHERE t2.ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'), -1, 'dd'),'yyyymmdd') AND t1.item_id = t2.item_id );
将新的item_id集合经过特征生产同步到在线数据源中。
将如下代码复制到DataWorks中的PyODPS 3节点中执行。
from feature_store_py.fs_client import FeatureStoreClient import datetime from feature_store_py.fs_datasource import MaxComputeDataSource import sys from odps.accounts import StsAccount cur_day = args['dt'] print('cur_day = ', cur_day) access_key_id = o.account.access_id access_key_secret = o.account.secret_access_key sts_token = None endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com' if isinstance(o.account, StsAccount): sts_token = o.account.sts_token fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint) cur_project_name = 'fs_demo_featuredb' project = fs.get_project(cur_project_name) feature_view_name = 'item_fea_embedding_debug_v1' batch_feature_view = project.get_feature_view(feature_view_name) task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_new_tmp_v1'}) task.wait()
直接同步已提取好的Embedding
如果已提取Embedding,不希望通过特征平台重复提取,可直接将Embedding同步至线上。使用已有提取好的Embedding直接同步时,表中要有review_text_embedding
字段。
假设已提取的Embedding表如下所示:
create table if not exists item_fea_review_text_embedding_debug_v1(
item_id bigint COMMENT 'item_id'
,review_text_embedding array<float> COMMENT 'review_text_embedding'
)
COMMENT 'item_fea_review_text_embedding_debug_v1'
PARTITIONED BY (
ds string COMMENT '业务日期'
)
LIFECYCLE 365
;
INSERT INTO TABLE item_fea_review_text_embedding_debug_v1 PARTITION (ds='20250509') -- 如果是分区表,默认插入最新分区
VALUES (902, cast(array(1,2,3,4,5,6,7.0,8.1,9.0) as array<float>)); -- 根据实际字段类型调整值
select *
from item_fea_review_text_embedding_debug_v1 where ds = '20250509';
将如下代码复制到DataWorks中的PyODPS 3节点中执行。
from feature_store_py.fs_client import FeatureStoreClient
import datetime
from feature_store_py.fs_datasource import MaxComputeDataSource
import sys
from odps.accounts import StsAccount
cur_day = args['dt']
print('cur_day = ', cur_day)
access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
sts_token = None
endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
if isinstance(o.account, StsAccount):
sts_token = o.account.sts_token
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
cur_project_name = 'fs_demo_featuredb'
project = fs.get_project(cur_project_name)
feature_view_name = 'item_fea_embedding_debug_v1'
batch_feature_view = project.get_feature_view(feature_view_name)
task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_embedding_debug_v1', 'transform': False})
task.wait()
task.print_summary()
导出及训练
新建模型特征
具体操作,请参见配置FeatureStore项目。
导出样本
具体操作,请参见导出Training Set并训练模型。
配置Feature Generator及训练
利用TorchEasyRec的create_fg_json脚本,根据TorchEasyRec Config产出fg.json。
具体操作,请参见配置Feature Generator及训练。其中,Embedding型特征TorchEasyRec Config配置,请参见TorchEasyRec特征。
上线
部署EAS服务
具体操作,请参见创建与部署EAS模型服务。
配置PAI-Rec引擎
具体操作,请参见配置PAI-REC。