如何通过MaxCompute读取数据以部署排序模型
在部署排序模型时,您可能需要访问存储在MaxCompute中的数据。为了实现这一点,我们推荐使用FeatureStore管理您的特征数据。FeatureStore不仅支持对特征的管理操作,还可以高效地与其他云服务集成,提高数据处理和模型部署的效率。更多关于FeatureStore的信息,请参见用FeatureStore管理特征
一、准备数据
同步数据表
一般对于推荐场景,需要准备三张数据表:user侧的特征表、item侧的特征表以及Label表。
为方便实践操作,我们在MaxCompute的pai_online_project项目中提前准备了模拟生成的用户表、物料表和Label表进行示例说明。其中,用户表、物料表每个分区大约有10万条数据,在MaxCompute中分别占用约70 MB;Label表每个分区约45万条数据,在MaxCompute中占用约5 MB。
您需要在DataWorks中执行SQL命令,将用户表、物料表、Label表从pai_online_project项目同步到自己的MaxCompute中。具体操作步骤如下:
登录DataWorks控制台。
在左侧导航栏单击数据建模与开发 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute > ODPS SQL,在弹出的页面中配置节点参数。
参数 | 取值建议 |
引擎实例 | 选择已创建的MaxCompute引擎。 |
节点类型 | ODPS SQL |
路径 | 业务流程/Workflow/MaxCompute |
名称 | 可自定义名称。 |
单击确认。
在新建节点区域运行以下SQL命令,将用户表、物料表、Label表从pai_online_project项目同步到自己的MaxCompute中。资源组选择已创建的独享资源组。
同步用户表:rec_sln_demo_user_table_preprocess_all_feature_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_all_feature_v1
like pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_all_feature_v1
PARTITION(ds='${bdp.system.bizdate}')
SELECT * except(ds)
FROM pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
WHERE ds = '${bdp.system.bizdate}';
同步物料表:rec_sln_demo_item_table_preprocess_all_feature_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_all_feature_v1
like pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_all_feature_v1
PARTITION(ds='${bdp.system.bizdate}')
SELECT * except(ds)
FROM pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
WHERE ds = '${bdp.system.bizdate}';
同步Label表:rec_sln_demo_label_table
CREATE TABLE IF NOT EXISTS rec_sln_demo_label_table
like pai_online_project.rec_sln_demo_label_table
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_label_table
PARTITION(ds='${bdp.system.bizdate}')
SELECT * except(ds)
FROM pai_online_project.rec_sln_demo_label_table
WHERE ds = '${bdp.system.bizdate}';
其中,${bdp.system.bizdate}是参数,可以通过补数据执行获取以下三个分区的数据:
ds=20231022
ds=20231023
ds=20231024
对已同步的表进行补数据操作。
在DataWorks控制台界面,单击左侧导航栏的数据建模与开发 > 运维中心,在下拉框中选择对应工作空间后单击进入运维中心。
在左侧导航栏单击周期任务运维 > 周期任务,进入周期任务页面。
单击周期任务列表中的目标任务,打开该任务的DAG图。
右键单击目标节点,选择补数据 > 当前节点,选择相应的补数据模式。
业务日期选择2023-10-22至2023-10-24,单击确定。
完成上述操作后,您可以在自己的工作空间内查看用户表rec_sln_demo_user_table_preprocess_all_feature_v1、物料表rec_sln_demo_item_table_preprocess_all_feature_v1和Label表rec_sln_demo_label_table,后续的实践操作将以这三张表为例进行说明。
配置数据源
FeatureStore一般需要配置两个数据源:离线数据源(MaxCompute)和在线数据源(Hologres/GraphCompute/TableStore)。本文以MaxCompute和Hologres为例进行说明。
登录PAI控制台,在左侧导航栏单击数据准备 > 特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
配置MaxCompute数据源。
单击管理数据源,进入管理数据源页面后单击新建数据源,在弹出的页面中配置MaxCompute数据源具体参数。
参数 | 取值建议 |
类型 | MaxCompute |
名称 | 可自定义名称。 |
MaxCompute项目名 | 选择已创建的MaxCompute项目。 |
请复制语句并单击立即前往,同步数据至Hologres,在DataWorks执行该语句后,即可完成授权。说明授权操作需要您的账号拥有admin权限,具体操作详情请参见通过命令管理用户权限或通过控制台(新版)管理用户权限。
完成后单击提交。
配置Hologres数据源。
单击管理数据源。进入管理数据源页面后单击新建数据源,在弹出的页面中配置Hologres数据源具体参数。
参数 | 取值建议 |
类型 | Hologres |
名称 | 可自定义名称。 |
实例ID | 选择已创建的Hologres实例名称。 |
数据库名称 | 选择已创建的实例数据库。 |
完成后单击提交。
对Hologres进行授权,具体操作详情请参见配置数据源。
二、创建并注册FeatureStore
您可以根据实际需求选择通过控制台或者SDK两种方式创建并注册FeatureStore。由于后续导出Training Set和同步数据都需要使用SDK,所以如果选择控制台操作的方式,完成控制台配置后,仍需要安装FeatureStore Python SDK。
方式一:控制台操作
方式二:FeatureStore Python SDK操作
创建FeatureStore项目
登录PAI控制台,在左侧导航栏单击数据准备 > 特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
单击新建项目,在弹出的页面中配置项目参数。
参数 | 取值建议 |
名称 | 可自定义名称。本文以fs_demo为例进行说明。 |
描述 | 可自定义描述。 |
离线数据源(Offline Store) | 选择已创建的MaxCompute数据源。 |
在线数据源(Online Store) | 选择已创建的Hologres数据源。 |
单击提交,完成FeatureStore项目创建。
创建特征实体(Feature Entity)
在FeatureStore项目列表页面,单击项目名称,进入项目详情页面。
在特征实体页签中单击新建特征实体,在弹出的页面中配置user特征实体参数。
参数 | 取值建议 |
特征实体名称 | 可自定义名称。本文以user为例进行说明。 |
Join Id | user_id |
单击提交。
单击新建特征实体,在弹出的页面中配置item特征实体参数。
创建FeatureStore项目
登录PAI控制台,在左侧导航栏单击数据准备 > 特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
单击新建项目,在弹出的页面中配置项目参数。
参数 | 取值建议 |
名称 | 可自定义名称。本文以fs_demo为例进行说明。 |
描述 | 可自定义描述。 |
离线数据源(Offline Store) | 选择已创建的MaxCompute数据源。 |
在线数据源(Online Store) | 选择已创建的Hologres数据源。 |
单击提交,完成FeatureStore项目创建。
创建特征实体(Feature Entity)
在FeatureStore项目列表页面,单击项目名称,进入项目详情页面。
在特征实体页签中单击新建特征实体,在弹出的页面中配置user特征实体参数。
参数 | 取值建议 |
特征实体名称 | 可自定义名称。本文以user为例进行说明。 |
Join Id | user_id |
单击提交。
单击新建特征实体,在弹出的页面中配置item特征实体参数。
参数 | 取值建议 |
特征实体名称 | 可自定义名称。本文以item为例进行说明。 |
Join Id | item_id |
单击提交,完成特征实体创建。
创建特征视图(Feature View)
在特征项目详情页面的特征视图页签,单击新建特征视图,在弹出的页面中配置user特征视图参数。
参数 | 取值建议 |
视图名称 | 可自定义名称。本文以user_table_preprocess_all_feature_v1为例进行说明。 |
类型 | 离线 |
写入方式 | 使用离线表 |
数据源 | 选择已创建的MaxCompute数据源。 |
特征表 | 选择已准备的用户表rec_sln_demo_user_table_preprocess_all_feature_v1。 |
特征字段 | 勾选user_id主键。 |
同步在线特征表 | 是 |
特征实体 | user |
特征生命周期(秒) | 保持默认。 |
单击提交。
单击新建特征视图,在弹出的页面中配置item特征视图。
参数 | 取值建议 |
视图名称 | 可自定义名称。本文以item_table_preprocess_all_feature_v1为例进行说明。 |
类型 | 离线 |
写入方式 | 使用离线表 |
数据源 | 选择已创建的MaxCompute数据源。 |
特征表 | 选择已准备的物料表rec_sln_demo_item_table_preprocess_all_feature_v1。 |
特征字段 | 勾选item_id主键。 |
同步在线特征表 | 是 |
特征实体 | item |
特征生命周期(秒) | 保持默认。 |
完成后单击提交,完成特征视图创建。
三、导出Training Set并训练模型
导出Training Set。
登录DataWorks控制台。
在左侧导航栏单击数据建模与开发 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3,在弹出的页面中配置节点参数。
参数 | 取值建议 |
引擎实例 | 选择已创建的MaxCompute引擎。 |
节点类型 | PyODPS 3 |
路径 | 业务流程/Workflow/MaxCompute |
名称 | 可自定义名称。 |
单击确认。
复制以下内容到脚本中。
单击右侧调度配置,在弹出的页面中配置调度参数。
参数 | 取值建议 | |
调度参数 | 参数名 | dt |
参数值 | $[yyyymmdd-1] | |
资源属性 | 调度资源组 | 选择已创建的独享资源组。 |
调度依赖 | 选择已创建的user表和item表。 |
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
(可选)查看导出任务。
在FeatureStore项目列表页面,单击项目名称,进入项目详情页面。
在特征实体页签中单击任务中心。
单击目标任务右侧的详情,即可查看该任务的基本信息、运行配置和任务日志。
训练模型EasyRec是一个开源的推荐系统框架,可以与FeatureStore无缝衔接,进行训练模型、导出模型、上线模型的操作。推荐您将fs_demo_fs_rank_v1_trainning_set表作为输入,使用EasyRec训练模型。
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import LabelInput, MaxComputeDataSource, TrainingSetOutput
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig
from feature_store_py.fs_config import TrainSetOutputConfig, EASDeployConfig
import datetime
import sys
cur_day = args['dt']
print('cur_day = ', cur_day)
offset = datetime.timedelta(days=-1)
pre_day = (datetime.datetime.strptime(cur_day, "%Y%m%d") + offset).strftime('%Y%m%d')
print('pre_day = ', pre_day)
access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
cur_project_name = 'fs_demo'
project = fs.get_project(cur_project_name)
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)
user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)
item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)
feature_view_config_list = [feature_view_user_config, feature_view_item_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
model_name = 'fs_rank_v1'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)
更多EasyRec相关的问题,请通过钉钉群(32260796)加入阿里云人工智能平台PAI咨询群联系我们。
四、上线模型
训练并导出模型后,可以进行部署和上线的操作。如果是自建推荐系统,FeatureStore提供FeatureStore Python/Go/Cpp/Java SDK,可以与各式各样的自建推荐系统进行衔接。您也可以通过钉钉群(32260796)联系我们咨询和商讨具体方案。如果是阿里云系列产品,可以和FeatureStore无缝衔接,快速搭建推荐系统上线。
本文以阿里云系列产品为例介绍上线模型操作。
步骤一:例行同步数据节点
上线前需要将数据同步节点例行,即例行将数据从离线数据源同步到在线数据源中,在线会实时地从在线数据源中读取。本示例需要将user特征表和item特征表提交例行,具体操作如下。
登录DataWorks控制台。
在左侧导航栏单击数据建模与开发 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
同步例行user表。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3。
复制以下内容到脚本中,完成user_table_preprocess_all_feature_v1例行同步。
user_table_preprocess_all_feature_v1 同步例行(单击查看详情)
单击右侧调度配置,在弹出的页面中配置调度参数。
参数 | 取值建议 | |
调度参数 | 参数名 | dt |
参数值 | $[yyyymmdd-1] | |
资源属性 | 调度资源组 | 选择已创建的独享资源组。 |
调度依赖 | 选择已创建的user表。 |
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步例行item表。
鼠标悬停至新增,选择新建节点 > MaxCompute > PyODPS 3,在弹出的页面中配置节点参数。
单击确认。
复制以下内容到脚本中。
item_table_preprocess_all_feature_v1 同步例行(单击查看详情)
单击右侧调度配置,在弹出的页面中配置调度参数。
参数 | 取值建议 | |
调度参数 | 参数名 | dt |
参数值 | $[yyyymmdd-1] | |
资源属性 | 调度资源组 | 选择已创建的独享资源组。 |
调度依赖 | 选择已创建的item表。 |
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步完成后,可以在Hologres中查看最新同步的特征。
步骤二:创建与部署EAS模型服务
模型服务是为了接收推荐引擎的请求,根据请求对对应的item进行打分,返回分数。其中EasyRec Processor内部包含了FeatureStore Cpp SDK,可以实现低延时、高性能的取特征操作,EasyRec Processor从Feature Store Cpp SDK取完特征后,送入到模型进行推理,拿到打分后返回给推荐引擎。
部署模型服务的流程如下。
登录DataWorks控制台。
在左侧导航栏单击数据建模与开发 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新增,选择新建节点 > MaxCompute > PyODPS 3。
复制以下内容到脚本中。
单击右侧调度配置,在弹出的页面中配置调度参数。
参数 | 取值建议 | |
调度参数 | 参数名 | dt |
参数值 | $[yyyymmdd-1] | |
资源属性 | 调度资源组 | 选择已创建的独享资源组。 |
调度依赖 | 选择对应的训练任务和item_table_preprocess_all_feature_v1。 |
节点配置并测试完成后,运行查看部署情况。
部署完成后,注释掉第34行代码,并将37行取消注释,提交任务例行。
(可选)您可以在EAS模型在线服务页面的推理服务页签,查看已部署的服务。操作详情请参见服务部署:控制台。
(可选)当使用 Hologres 等只能通过指定 VPC 访问的数据源时,需要打通 EAS 和对应数据源的 VPC 网络。例如,当使用 Hologres 时,可以在 Hologres 的网络信息页面看到指定 VPC 对应的 VPC ID,交换机 ID 等信息,需要在 EAS 服务页面的右上角点击配置高速链接按钮,填入对应的 VPC ID 和交换机 ID,最后还需要填写安全组名称,可以选择已有安全组或者新建安全组,注意其放行的端口要符合 Hologres 链接的要求,一般 Hologres 链接使用80端口,因此选择的安全组需要开放80端口才能正常连接。都填写完成后,单击确定,待服务正常更新完成后即可使用。
步骤三:配置PAI-REC
PAI-REC是推荐引擎服务,其中集成了FeatureStore的Go SDK,可以与FeatureStore和EAS无缝衔接。
具体配置步骤如下。
配置FeatureStoreConfs。
RegionId:修改为产品实际所在区域,此处以cn-beijing为例。
ProjectName:已创建的FeatureStore项目名称,fs_demo。
配置FeatureConfs。
FeatureStoreName :和上一步FeatureStoreConfs中的设置pairec-fs保持一致。
FeatureStoreModelName :已创建的模型特征名称,fs_rank_v1。
FeatureStoreEntityName :已创建的特征实体名称,user。表示让PAI-REC中的FeatureStore Go SDK来取模型fs_rank_v1中特征实体为user侧的特征。
配置AlgoConfs。此配置代表告诉PAI-REC去连接哪一个EAS模型打分服务。
Name:和部署的EAS服务名称一致。
Url和Auth:EAS服务给出的信息,您可以在EAS模型在线服务页面单击服务名称,然后在服务详情页签单击查看调用信息获取URL和Token。更多详细配置可参见PAI-REC和EAS常见问题。
配置完成后,关于如何使用PAI-REC,请参见什么是推荐全链路深度定制开发平台PAI-REC。
import os
import json
config = {
"name": "fs_demo_v1",
"metadata": {
"cpu": 4,
"rpc.max_queue_size": 256,
"rpc.enable_jemalloc": 1,
"gateway": "default",
"memory": 16000
},
"model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg", # 训练模型路径,可自定义。
"model_config": {
"access_key_id": f'{o.account.access_id}',
"access_key_secret": f'{o.account.secret_access_key}',
"region": "cn-beijing", # 请替换为PAI产品实际部署地域,本文以cn-beijing为例进行说明。
"fs_project": "fs_demo", # 请替换为您的FeatureStore项目名称,本文以fs_demo为例进行说明。
"fs_model": "fs_rank_v1", # 请替换为您的FeatureStore模型特征名称,本文以fs_rank_v1为例进行说明。
"fs_entity": "item",
"load_feature_from_offlinestore": True,
"steady_mode": True,
"period": 2880,
"outputs": "probs_is_click,y_ln_playtime,probs_is_praise",
"fg_mode": "tf",
},
"processor": "easyrec-1.9",
"processor_type": "cpp"
}
with open("echo.json", "w") as output_file:
json.dump(config, output_file)
# 第一次部署时运行下面这行
os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json")
# 例行更新时运行下面这行
# os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
"FeatureStoreConfs": {
"pairec-fs": {
"RegionId": "cn-beijing",
"AccessId": "${AccessKey}",
"AccessKey": "${AccessSecret}",
"ProjectName": "fs_demo"
}
},
"FeatureConfs": {
"recreation_rec": {
"AsynLoadFeature": true,
"FeatureLoadConfs": [
{
"FeatureDaoConf": {
"AdapterType": "featurestore",
"FeatureStoreName": "pairec-fs",
"FeatureKey": "user:uid",
"FeatureStoreModelName": "fs_rank_v1",
"FeatureStoreEntityName": "user",
"FeatureStore": "user"
}
}
]
}
},
"AlgoConfs": [
{
"Name": "fs_demo_v1",
"Type": "EAS",
"EasConf": {
"Processor": "EasyRec",
"Timeout": 300,
"ResponseFuncName": "easyrecMutValResponseFunc",
"Url": "eas_url_xxx",
"EndpointType": "DIRECT",
"Auth": "eas_token"
}
}
],