This tutorial shows you how to create and deploy an end-to-end recommendation system on PAI-FeatureStore using the Python SDK.
Prerequisites
Ensure that the following prerequisites are met.
Service | Actions |
Platform for AI (PAI) |
|
MaxCompute |
|
FeatureDB |
|
DataWorks |
|
Step 1: Prepare data
Synchronize data tables
A recommendation scenario typically requires the following data tables: a user feature table, an item table, a label table, a sequence feature table, and a behavior table.
This hands-on tutorial uses simulated data tables prepared in the pai_online_project MaxCompute project. To use them, run SQL commands in DataWorks to synchronize the tables to your own MaxCompute project. Follow these steps:
Log on to the DataWorks console.
In the navigation pane on the left, click Data Development & O&M > Data Development.
Select your DataWorks workspace and click Enter Data Development.
Hover over New, and then choose New Node > MaxCompute > ODPS SQL. Configure the node parameters as follows:
Parameter
Recommended value
engine instance
Select the MaxCompute engine that you created.
node type
ODPS SQL
path
Business Flow/Workflow/MaxCompute
name
Enter a custom name.
Click Confirm.
In the node editor, run the following SQL commands to synchronize the data tables to your MaxCompute project. For Resource Group, select your exclusive resource group.
User table: rec_sln_demo_user_table_preprocess_all_feature_v1
Item table: rec_sln_demo_item_table_preprocess_all_feature_v1
Label table: rec_sln_demo_label_table
Sequence feature table: rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
Behavior table: rec_sln_demo_behavior_table_preprocess_v3
You can now view the synchronized tables in your workspace. The rest of this tutorial uses these five tables.
Configure data sources
FeatureStore typically requires two types of data sources: an offline data source, such as MaxCompute, and an online data source, such as FeatureDB, Hologres, or Tablestore. This tutorial uses MaxCompute and FeatureDB as examples.
Log on to the PAI console. In the navigation pane on the left, choose Data Preparation > FeatureStore.
Select a workspace and click Enter FeatureStore.
Configure a MaxCompute data source.
On the Create Store tab, click Create Store, and in the page that appears, configure the parameters for the MaxCompute data source.
Parameter
Recommended value
Type
MaxCompute
Name
Enter a custom name.
MaxCompute Project Name
Select the MaxCompute project that you created.
Click Submit.
Configure a FeatureDB data source.
If you have already created a FeatureDB data source, you can skip this step.
On the Store tab, click Create Store and configure the parameters for the FeatureDB data source in the dialog box.
Parameter
Recommended value
Type
FeatureDB (If this is your first time using FeatureDB, follow the on-screen instructions to activate it.)
Name
Custom names are not supported. The default name is
feature_db.Username
Set a username.
Password
Set a password.
VPC network high-speed connection (optional)
This option allows you to use the FeatureStore SDK to directly access FeatureDB from within a VPC over a private network. This connection improves data read/write performance and reduces access latency.
VPC
Select the VPC that contains your online FeatureStore services.
Zone and vSwitch
Select a zone and vSwitch. Ensure the vSwitch is in the same zone as your online services. For high availability, we recommend selecting vSwitches in at least two zones.
Click Submit.
II. FeatureStore Python SDK creation process
Install the FeatureStore Python SDK, which requires a Python 3 environment. We recommend running all the following code in a Jupyter Notebook.
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.2.0-py3-none-any.whlImport the required modules.
import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))FeatureStore project
You can create multiple projects in FeatureStore, with each project serving as an isolated namespace. For detailed instructions, see Configure a FeatureStore project. Running this notebook requires a backend FeatureStore service and configured data sources. After activating FeatureStore, you must configure a data source. For more information, see Create a data source.
The offline_datasource_id parameter specifies the ID of the offline data source, and online_datasource_id specifies the ID of the online data source.
This example uses a project named fs_demo.
access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError(f"The project '{cur_project_name}' does not exist. You must create it first.")Run the following code to retrieve the current project and print its information.
project = fs.get_project(cur_project_name)
print(project)Feature Entity
A feature entity describes a collection of related features. Multiple feature views can be associated with a single feature entity. Each entity has a join_id, which joins features from different feature views. Each feature view has a primary key for retrieving its feature data, and the name of the primary key can be different from the join_id.
In recommendation systems, features are typically associated with either a user or an item entity. This example demonstrates how to create both a user entity and an item entity.
Create user entity
user_entity_name = "user" user_join_id = 'user_id' user_entity = project.get_entity(user_entity_name) if user_entity is None: user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id) user_entity.print_summary()Create item entity
item_entity_name = "item" join_id = 'item_id' item_entity = project.get_entity(item_entity_name) if item_entity is None: item_entity = project.create_entity(name = item_entity_name, join_id=join_id) item_entity.print_summary()
Feature View
A feature view is a core object in FeatureStore that ingests external data. It defines where the data comes from (the data source), any preprocessing or transformation operations, the data structure (the feature schema, including feature names and types), and the storage location (online store and/or offline store). A feature view also manages metadata, such as the primary key, event time, partition key, associated feature entity, and TTL (Time to Live). A default TTL of -1 indicates that the feature is permanently valid. A positive TTL value means that online queries will retrieve only the latest feature data within the specified time window.
Feature views are available in three types: Batch FeatureView (for offline or T-1 day features), Stream FeatureView (for real-time features), and Sequence FeatureView (for sequence features).
Batch FeatureView
A Batch FeatureView injects offline data into the FeatureStore offline store and can optionally synchronize it to the online store to support real-time queries. Use this feature view for offline or T-1 day features.
Register the user offline feature table
Register the rec_sln_demo_user_table_preprocess_all_feature_v1 table in FeatureStore.
user_feature_view_name = "user_table_preprocess_all_feature_v1" user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1" user_feature_view = project.get_feature_view(user_feature_view_name) if user_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name) user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True) print(user_feature_view)Synchronize data in the 20231023 partition of the
rec_sln_demo_user_table_preprocess_all_feature_v1table from the offline data source to the online data source.user_task = user_feature_view.publish_table({'ds':'20231023'}) user_task.wait()Check the task status.
user_task.print_summary()
Register the item offline feature table
Register the rec_sln_demo_item_table_preprocess_all_feature_v1 table in FeatureStore.
item_feature_view_name = "item_table_preprocess_all_feature_v1" item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1" item_feature_view = project.get_feature_view(item_feature_view_name) if item_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name) item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True) print(item_feature_view)Synchronize data in the 20231023 partition of the
rec_sln_demo_item_table_preprocess_all_feature_v1table from the offline data source to the online data source.item_task = item_feature_view.publish_table({'ds':'20231023'}) item_task.wait()Check the task status.
item_task.print_summary()
Sequence FeatureView
A Sequence FeatureView supports writing sequence features offline and reading real-time sequence features online. In recommendation scenarios, an offline sequence feature table (F1) is initially generated through simulation and can later be replaced by online logs. During online queries for real-time sequences, data is retrieved from two online behavior tables: a T-1 day behavior table (B1) and a current-day (T-day) real-time behavior table (B2). The B2 table contains features updated by real-time computation. FeatureStore queries both B1 and B2, constructs the user's feature sequence, and then combines it with other features for model scoring.
The online T-1 day behavior table (B1) is typically synchronized from an offline T-1 day behavior table (A1), with FeatureStore automatically handling operations like deduplication. The current-day (T-day) online behavior table (B2) currently requires you to write data through an API or other Alibaba Cloud products such as Flink.
Therefore, when you register a Sequence FeatureView, FeatureStore manages four tables on your behalf: the offline sequence table (F1), the offline T-1 day behavior table (A1), the online T-1 day behavior table (B1), and the online T-day behavior table (B2).
During registration, you only provide the offline sequence table (F1) and the offline T-1 day behavior table (A1). FeatureStore creates the online tables and handles synchronization and deduplication.
Register the Sequence FeatureView.
seq_feature_view_name = "wide_seq_feature_v3" seq_feature_view = project.get_feature_view(seq_feature_view_name) if seq_feature_view is None: seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3" behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3' ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name) event_time = 'event_unix_time' # Field name for event time in the behavior table. item_id = 'item_id' # Field name for item_id in the behavior table. event = 'event' # Field name for the event in the behavior table. # deduplication_method = 1: Deduplicates by ['user_id', 'item_id', 'event']. # deduplication_method = 2: Deduplicates by ['user_id', 'item_id', 'event', 'event_time']. sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)] # offline_seq_name: Column name of the sequence feature in the offline sequence table. # seq_event: The behavior field name used for filtering. # online_seq_name: The name under which the corresponding item_id sequence for the user is returned by the online Go SDK. # seq_len: The sequence length. Sequences longer than this value are truncated. seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time') seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds, event_time=event_time, item_id=item_id, event=event, deduplication_method=1, sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name) # seq_feature_view.print_summary() print(seq_feature_view)Synchronize data in the 20231023 partition of the
rec_sln_demo_behavior_table_preprocess_v3table from the offline data source to the online data source. During synchronization, the system automatically checks for data from the previous N days and backfills it if missing. You can specify N by using thedays_to_loadparameter, which defaults to 30. We recommend using the default value in most cases.seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30) seq_task.wait()Check the task status.
seq_task.print_summary()
Stream FeatureView
A Stream FeatureView writes data directly to the online store and simultaneously syncs it to the offline store. Use it for scenarios that require real-time feature updates, such as updating item prices or sales volumes.
Register the label table
label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)Retrieve online features
You can retrieve features from the online store for purposes such as debugging offline-online consistency and data analysis. This feature is currently optimized for Hologres.
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.list_feature_view_online_features(join_ids=['169898460', '148811946'])
print("ret_features = ", ret_features_1)Training set
To train a model, first construct a training set of label and feature data. When interacting with FeatureStore, you provide the label data, specify the features to retrieve, and then the system performs a point-in-time join based on the primary key (if an event time exists).
# Specify the label table
label_table_name = 'rec_sln_demo_label_table'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector = FeatureSelector(user_feature_view_name, '*') # '*' selects all features.
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)Model features
After training a model and deploying it as a service, you can use it for business predictions. The training samples are available in the train_set object from the previous step.
model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)Step 3: Export training set and train model
To train the model, you must export a training set.
Export training set
Specify the partition and event_time for the label table and each feature view.
cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)
user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)
item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)
seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)Train model
EasyRec is an open-source recommendation system framework that seamlessly integrates with PAI-FeatureStore, which lets you train, export, and deploy models. Use the fs_demo_fs_rank_v2_training_set table as input to train your model with EasyRec.
For the EasyRec source code, see EasyRec.
For the EasyRec documentation, see Introduction to EasyRec.
For documentation about training, see Training with EasyRec.
For further questions about EasyRec, contact us in the Machine Learning Platform for AI (PAI) support group on DingTalk (ID: 32260796).
IV. Deploy the model
After you train and export a model, you can deploy it for online serving. If you have a self-built recommendation system, FeatureStore provides SDKs for Python, Go, C++, and Java to simplify integration. You can also contact us through the DingTalk group (ID: 32260796) to discuss specific solutions. If you use Alibaba Cloud services, you can seamlessly integrate them with FeatureStore to quickly build and deploy a recommendation system.
This topic explains how to deploy a model using Alibaba Cloud services as an example.
Schedule data synchronization jobs
Before you deploy the model, schedule data synchronization jobs. These jobs periodically publish data from an offline data source to an online data source to keep features current.
Log on to the DataWorks console.
In the left-side navigation pane, choose Data Development & O&M > Data Development.
Select your DataWorks workspace and click Enter Data Development.
Schedule the user table synchronization.
Hover over New and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following script into the editor to schedule synchronization for
user_table_preprocess_all_feature_v1.In the right-side pane, click Scheduling configuration and configure the scheduling parameters.
Parameter
Value
Scheduling parameters
Parameter name
dt
Parameter value
$[yyyymmdd-1]
Resource properties
Resource group for scheduling
Select the exclusive resource group that you created.
Scheduling dependencies
Select the user table that you created.
After configuring and testing the node, save and submit it.
Run a data backfill. For more information, see synchronize data tables.
Schedule the item table synchronization.
Hover over New and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following script to the editor.
In the right-side pane, click Scheduling configuration and configure the scheduling parameters.
Parameter
Value
Scheduling parameters
Parameter name
dt
Parameter value
$[yyyymmdd-1]
Resource properties
Resource group for scheduling
Select the exclusive resource group that you created.
Scheduling dependencies
Select the item table that you created.
After configuring and testing the node, save and submit it.
Run a data backfill. For more information, see synchronize data tables.
Schedule the real-time sequence behavior table synchronization.
Hover over New and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following script to the editor.
In the right-side pane, click Scheduling configuration and configure the scheduling parameters.
Parameter
Value
Scheduling parameters
Parameter name
dt
Parameter value
$[yyyymmdd-1]
Resource properties
Resource group for scheduling
Select the exclusive resource group that you created.
Scheduling dependencies
Select the item table that you created.
After configuring and testing the node, save and submit it.
Run a data backfill. For more information, see synchronize data tables.
After the synchronization is complete, you can view the latest features in Hologres.
Create and deploy EAS model service
The model service receives requests from the recommendation engine, scores a candidate item set based on the request, and returns the scores. The EasyRec processor includes the FeatureStore C++ SDK for low-latency, high-performance feature retrieval. The processor uses the SDK to retrieve features, sends them to the model for inference, and returns the resulting scores to the recommendation engine.
Follow these steps to deploy the model service.
Log on to the DataWorks console.
In the left-side navigation pane, choose Data Development & O&M > Data Development.
Select your DataWorks workspace and click Enter Data Development.
Hover over New and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following script to the editor.
import os import json config = { "name": "fs_demo_v1", "metadata": { "cpu": 4, "rpc.max_queue_size": 256, "rpc.enable_jemalloc": 1, "gateway": "default", "memory": 16000 }, "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg", "model_config": { "access_key_id": f'{o.account.access_id}', "access_key_secret": f'{o.account.secret_access_key}', "region": "cn-beijing", "fs_project": "fs_demo", "fs_model": "fs_rank_v2", "fs_entity": "item", "load_feature_from_offlinestore": True, "steady_mode": True, "period": 2880, "outputs": "probs_is_click,y_ln_playtime,probs_is_praise", "fg_mode": "tf" }, "processor": "easyrec-1.8", "processor_type": "cpp" } with open("echo.json", "w") as output_file: json.dump(config, output_file) # Run this line for the first deployment only. os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json") # For scheduled updates, run the following line. # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")In the right-side pane, click Scheduling configuration and configure the scheduling parameters.
Parameter
Value
Scheduling parameters
Parameter name
dt
Parameter value
$[yyyymmdd-1]
Resource properties
Resource group for scheduling
Select the exclusive resource group that you created.
Scheduling dependencies
Select the corresponding training task and
item_table_preprocess_all_feature_v1.After you configure and test the node, run it to verify the deployment.
After the deployment is complete, comment out line 34, uncomment line 37, and submit the node for scheduled execution.
(Optional) You can view the deployed service on the Inference Service tab of the Elastic Algorithm Service (EAS) page. For more information, see Deploy a custom service.
Configure PAI-Rec
PAI-Rec is a recommendation engine service that includes the FeatureStore Go SDK and seamlessly integrates with both FeatureStore and EAS.
To configure PAI-Rec, follow these steps:
Configure
FeatureStoreConfs.RegionId: Set this parameter to the region where your service is deployed. This example uses cn-beijing.ProjectName: The name of the FeatureStore project that you created, which is fs_demo.
"FeatureStoreConfs": { "pairec-fs": { "RegionId": "cn-beijing", "AccessId": "${AccessKey}", "AccessKey": "${AccessSecret}", "ProjectName": "fs_demo" } },Configure
FeatureConfs.FeatureStoreName: Make sure this value matches pairec-fs from the FeatureStoreConfs configuration.FeatureStoreModelName: Set this parameter to the name of the model feature, which is fs_rank_v1 in this example.FeatureStoreEntityName: Set this parameter to the name of the feature entity, which is user in this example. This instructs the PAI-Rec engine to use the FeatureStore Go SDK to retrieve features for theuserentity from thefs_rank_v1model feature.
"FeatureConfs": { "recreation_rec": { "AsynLoadFeature": true, "FeatureLoadConfs": [ { "FeatureDaoConf": { "AdapterType": "featurestore", "FeatureStoreName": "pairec-fs", "FeatureKey": "user:uid", "FeatureStoreModelName": "fs_rank_v1", "FeatureStoreEntityName": "user", "FeatureStore": "user" } } ] } },Configure
AlgoConfs.This configuration specifies which EAS model scoring service PAI-Rec will call.
Name: Make sure this value matches the name of the deployed EAS service.UrlandAuth: These values are provided by the EAS service. In the Overview console, click the service name, and then on the Overview tab, click View Endpoint Information to get the URL and token. For more configuration details, see PAI-Rec and EAS FAQ.
"AlgoConfs": [ { "Name": "fs_demo_v1", "Type": "EAS", "EasConf": { "Processor": "EasyRec", "Timeout": 300, "ResponseFuncName": "easyrecMutValResponseFunc", "Url": "eas_url_xxx", "EndpointType": "DIRECT", "Auth": "eas_token" } } ],
After you complete the configuration, see What is PAI-Rec to learn how to use it.