Use the FeatureStore Python SDK to register feature views, write feature data to offline and online stores, and generate training datasets for model training.
This guide uses the open-source Moviedata dataset as a working example. The Movie, User, and Rating tables map to the item, user, and label tables in a typical recommendation pipeline.
In this guide, you will:
Install the SDK and configure your project
Define feature entities and feature views
Write data to the offline store and publish it to the online store
Retrieve online features and define a feature selector
Create a training dataset and export it for model training
Prerequisites
Before you begin, make sure you have:
Obtained the AccessKey ID and AccessKey Secret for your Alibaba Cloud account
Configured data sources in FeatureStore (both offline and online)
Run the code in this guide on a DSW instance for the best experience.
Step 1: Install the SDK and configure your project
Install the SDK
Run the following command in a Python 3 environment:
pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whlSet environment variables
Store your credentials as environment variables to avoid hardcoding sensitive information. In your DSW Notebook, click Terminal from the top menu bar, then run:
echo "export AccessKeyID='<your-access-key-id>'" >> ~/.bashrc
echo "export AccessKeySecret='<your-access-key-secret>'" >> ~/.bashrc
source ~/.bashrcReplace <your-access-key-id> and <your-access-key-secret> with your actual AccessKey ID and AccessKey Secret.
Import modules
import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))Connect to a FeatureStore project
Initialize the client and connect to your project. You can create multiple independent projects in FeatureStore. This guide uses a project named fs_movie.
# Load credentials from environment variables
access_id = os.getenv("AccessKeyID")
access_ak = os.getenv("AccessKeySecret")
# Set the region where your FeatureStore is activated
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("Project not found. Create the project first: fs_movie")To verify the connection, print the project details:
project = fs.get_project(cur_project_name)
print(project)Step 2: Define feature entities
A feature entity groups semantically related features. Each entity has a join ID that links features across multiple feature views. Feature views can use different primary key names, but they join on this shared entity ID.
The Moviedata example uses three entities:
# Movie entity
cur_entity_name_movie = "movie_data"
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
entity_movie = project.create_entity(name=cur_entity_name_movie, join_id='movie_id')
entity_movie.print_summary()
# User entity
cur_entity_name_user = "user_data"
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
entity_user = project.create_entity(name=cur_entity_name_user, join_id='user_md5')
entity_user.print_summary()
# Rating entity
cur_entity_name_ratings = "rating_data"
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
entity_ratings = project.create_entity(name=cur_entity_name_ratings, join_id='rating_id')
entity_ratings.print_summary()Step 3: Create feature views and write data
Feature views define how external data enters FeatureStore: the data source, schema, storage location, and feature metadata. FeatureStore supports three feature view types:
| Type | Use case | Write path |
|---|---|---|
BatchFeatureView | Offline or T-1 day features | Write to offline store → publish to online store |
StreamFeatureView | Real-time features | Write directly to online store |
Sequence FeatureView | User behavior sequences | Write offline → read online in real time |
Time to live (TTL) controls how long the online store retains data. -1 retains all data; a positive value keeps only data within the specified period.BatchFeatureView
BatchFeatureView handles offline or T-1 day features. Data flows through two distinct operations:
`write_table()` — writes data to the MaxCompute offline store. This registers data in the offline store only; it is not yet available for online queries.
`publish_table()` — syncs data from the offline store to the online store, making it available for real-time retrieval.
Movie feature view
# Load the movie CSV from a public URL
path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
ds = UrlDataSource(path, delimiter=',', omit_header=True)
# Define the schema
movie_schema = OpenSchema(
OpenField(name='movie_id', type='STRING'),
OpenField(name='name', type='STRING'),
OpenField(name='alias', type='STRING'),
OpenField(name='actors', type='STRING'),
OpenField(name='cover', type='STRING'),
OpenField(name='directors', type='STRING'),
OpenField(name='double_score', type='STRING'),
OpenField(name='double_votes', type='STRING'),
OpenField(name='genres', type='STRING'),
OpenField(name='imdb_id', type='STRING'),
OpenField(name='languages', type='STRING'),
OpenField(name='mins', type='STRING'),
OpenField(name='official_site', type='STRING'),
OpenField(name='regions', type='STRING'),
OpenField(name='release_date', type='STRING'),
OpenField(name='slug', type='STRING'),
OpenField(name='story', type='STRING'),
OpenField(name='tags', type='STRING'),
OpenField(name='year', type='STRING'),
OpenField(name='actor_ids', type='STRING'),
OpenField(name='director_ids', type='STRING'),
OpenField(name='dt', type='STRING')
)
# Create the feature view (registers metadata only — data is not written yet)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(
name=feature_view_movie_name,
schema=movie_schema,
online=True,
entity=cur_entity_name_movie,
primary_key='movie_id',
partitions=['dt'],
ttl=-1
)
# Write data to the offline store
cur_task = batch_feature_view.write_table(ds, partitions={'dt': '20220830'})
cur_task.wait()
print(cur_task.task_summary)
# Publish to the online store
cur_task = batch_feature_view.publish_table({'dt': '20220830'})
cur_task.wait()
print(cur_task.task_summary)
# Verify
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()User and rating feature views
The user and rating tables follow the same pattern — load the source, define the schema, create the feature view, write, and publish:
# User feature view
users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter=',', omit_header=True)
user_schema = OpenSchema(
OpenField(name='user_md5', type='STRING'),
OpenField(name='user_nickname', type='STRING'),
OpenField(name='ds', type='STRING')
)
feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(
name=feature_view_user_name,
schema=user_schema,
online=True,
entity=cur_entity_name_user,
primary_key='user_md5',
partitions=['ds'],
ttl=-1
)
write_table_task = batch_feature_view.write_table(ds, {'ds': '20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds': '20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()# Rating feature view
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter=',', omit_header=True)
ratings_schema = OpenSchema(
OpenField(name='rating_id', type='STRING'),
OpenField(name='user_md5', type='STRING'),
OpenField(name='movie_id', type='STRING'),
OpenField(name='rating', type='STRING'),
OpenField(name='rating_time', type='STRING'),
OpenField(name='dt', type='STRING')
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(
name=feature_view_rating_name,
schema=ratings_schema,
online=True,
entity=cur_entity_name_ratings,
primary_key='rating_id',
event_time='rating_time',
partitions=['dt']
)
cur_task = batch_feature_view.write_table(ds, {'dt': '20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()StreamFeatureView
StreamFeatureView handles real-time features. Data is written directly to the online store.
First, create the test data table in MaxCompute or DataWorks:
CREATE TABLE IF NOT EXISTS online_stream_test_t1 (
id STRING COMMENT 'ID',
count_value BIGINT COMMENT 'Count value',
metric_value DOUBLE COMMENT 'Metric value'
)
PARTITIONED BY (
ds string COMMENT 'Data timestamp'
)
LIFECYCLE 365;
INSERT INTO TABLE online_stream_test_t1 PARTITION (ds='20250815')
SELECT
CONCAT('str_', CAST(id AS STRING)) AS id,
CAST(FLOOR(RAND() * 1000000) AS BIGINT) AS count_value,
ROUND(RAND() * 1000, 2) AS metric_value
FROM (
SELECT SEQUENCE(1, 1000) AS id_list
) tmp
LATERAL VIEW EXPLODE(id_list) table_tmp AS id;After the SQL runs successfully, the online_stream_test_t1 table is created with data in the ds=20250815 partition.
Then create and publish the StreamFeatureView:
online_schema = OpenSchema(
OpenField(name='id', type='STRING'),
OpenField(name='count_value', type='INT64'),
OpenField(name='metric_value', type='DOUBLE')
)
feature_view_rating_name_stream = "feature_view_online_stream"
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
if stream_feature_view is None:
stream_feature_view = project.create_stream_feature_view(
name=feature_view_rating_name_stream,
schema=online_schema,
online=True,
entity=cur_entity_name_user,
primary_key='id',
event_time='count_value'
)
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
stream_feature_view.print_summary()In aStreamFeatureView, theevent_timefield is used to clean up expired data when configured. For details, see Real-time feature lifecycle.
Sync data to the online store:
# Replace offline_datasource_id with your FeatureStore project's offline store ID.
# table_name is the offline feature table to push to the online store.
stream_task = stream_feature_view.publish_table(
partitions={'ds': '20250815'},
mode='Merge',
offline_to_online=True,
publish_config={
'offline_datasource_id': project.offline_datasource_id,
'table_name': 'online_stream_test_t1'
}
)
stream_task.wait()
print(stream_task.task_summary)Sequence FeatureView
Sequence FeatureView stores user behavior sequences for offline training and online real-time retrieval.
First, copy the source sequence data from pai_online_project (public read access) into your own project:
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
LIKE pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
SELECT *
FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
WHERE ds >= '20231022' AND ds <= '20231024';After the SQL runs successfully, the sequence feature table is created with data from partitions ds=20231022, ds=20231023, and ds=20231024.
Create the sequence feature view:
user_entity_name = "user"
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_view = project.get_feature_view(seq_feature_view_name)
if seq_feature_view is None:
seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
event_time = 'event_unix_time' # Event time field in the behavior table
item_id = 'item_id' # Item ID field in the behavior table
event = 'event' # Event type field in the behavior table
# deduplication_method=1: deduplicates on ['user_id', 'item_id', 'event']
# deduplication_method=2: deduplicates on ['user_id', 'item_id', 'event', 'event_time']
sequence_feature_config_list = [
SequenceFeatureConfig(
offline_seq_name='click_seq_50_seq', # Field name in the offline sequence table
seq_event='click', # Event type to filter
online_seq_name='click_seq_50', # Name exposed to the online Go SDK
seq_len=50 # Maximum sequence length; longer sequences are truncated
)
]
seq_table_config = SequenceTableConfig(
table_name=seq_table_name,
primary_key='user_id',
event_time='event_unix_time'
)
seq_feature_view = project.create_sequence_feature_view(
seq_feature_view_name,
datasource=ds,
event_time=event_time,
item_id=item_id,
event=event,
deduplication_method=1,
sequence_feature_config=sequence_feature_config_list,
sequence_table_config=seq_table_config,
entity=user_entity_name
)
seq_feature_view.print_summary()Sync data to the online store:
seq_task = seq_feature_view.publish_table({'ds': '20231023'}, days_to_load=30)
seq_task.wait()
seq_task.print_summary()Register the label table for future training set generation:
label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='rating_time')Step 4: Retrieve online features and define feature selectors
Retrieve online features
Retrieve features directly from a feature view. FeatureStore currently prioritizes FeatureDB for online feature retrieval.
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
# Retrieve features for a single item
ret_features_1 = batch_feature_view.list_feature_view_online_features(join_ids=['26357307'])
print("ret_features1 = ", ret_features_1)
# Retrieve features for multiple items in one call
ret_features_2 = batch_feature_view.list_feature_view_online_features(join_ids=['30444960', '3317352'])
print("ret_features2 = ", ret_features_2)Define a feature selector
A FeatureSelector specifies which features to pull from a feature view when generating a training dataset or running inference. Three selection patterns are supported:
feature_view_name = 'feature_view_movie'
# Select specific features by name
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
# Select all features
feature_selector = FeatureSelector(feature_view_name, '*')
# Select specific features and apply an alias
feature_selector = FeatureSelector(
feature_view='user1',
features=['f1', 'f2', 'f3'],
alias={"f1": "f1_1"} # Expose f1 as f1_1 in the output
)Step 5: Create and export a training dataset
Create a training dataset
A training dataset (sample table) combines a label table with features from one or more feature views, joined on primary keys using point-in-time joins to prevent data leakage.
label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
# Select features from movie and user views
feature_movie_selector = FeatureSelector('feature_view_movie', ['name', 'actors', 'regions', 'tags'])
feature_user_selector = FeatureSelector('feature_view_users', ['user_nickname'])
train_set = project.create_training_set(
label_table_name=label_table_name,
train_set_output=train_set_output,
feature_selectors=[feature_movie_selector, feature_user_selector]
)
print("train_set = ", train_set)Register a model
Create a model entry that links to the training dataset:
model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)Export the training dataset
Specify the partitions and event time for the label table and each feature view, then run the export:
# Label table configuration
label_partitions = PartitionConfig(name='dt', value='20220831')
label_input_config = LabelInputConfig(
partition_config=label_partitions,
event_time='1999-01-00 00:00:00'
)
# Feature view configurations
movie_partitions = PartitionConfig(name='dt', value='20220830')
feature_view_movie_config = FeatureViewConfig(name='feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name='ds', value='20220830')
feature_view_user_config = FeatureViewConfig(name='feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
# Output partition configuration
train_set_partitions = PartitionConfig(name='dt', value='20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
# Run the export
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print(task.summary)Key concepts
Offline store
The offline store is a data warehouse for storing historical features. Features are written to MaxCompute or Hadoop Distributed File System (HDFS) using Apache Spark. The offline store serves two purposes: generating training datasets for model training and providing features for batch predictions.
Online store
The online store is a data warehouse for real-time features, providing low-latency access for online inference. FeatureStore supports FeatureDB, Hologres, and Tablestore as online stores.
Feature view types
| Type | Description |
|---|---|
BatchFeatureView | Offline or T-1 day features. Offline data is written to the offline store and can be published to the online store for real-time queries. |
StreamFeatureView | Real-time features. Data is written directly to the online store. |
Sequence FeatureView | User behavior sequence features, supporting offline writes and online real-time reads. |
What's next
FeatureDB overview — learn about the online store backing real-time feature retrieval
Configure FeatureStore items — create and manage multiple projects
EasyRec on GitHub — integrate FeatureStore with EasyRec for feature generation (FG) and model training
FeatureGenerator on GitHub — generate features for recommendation models
For issues encountered while using FeatureStore, join the DingTalk support group (group ID: 32260796).