如何使用FeatureStore Python SDK

本文为您介绍通过FeatureStore整合数据特征并进行模型离线训练,以及后续的上线服务操作流程。

背景信息

特征平台是用来生产、共享和管理机器学习模型特征的存储库,可以方便地向多人、多团队共享特征,保证离线在线的一致性,并提供高效的在线特征访问。

特征平台基本适用于所有需要特征的场景,典型场景如推荐场景。特征表注册在特征平台中,特征平台可以自动完成在线和离线表的构建,保证在线和离线的一致性,同时保证特征表只存一份的情况下,能够向多人共享特征,减少资源成本。特征平台还可以节省时间成本,原本需要复杂的SQL操作,比如导出训练表,数据导入到Hologres表中等操作,在特征平台中都可以通过一行代码完成。

目前特征平台封装了整个特征到模型的流程,离线支持MaxCompute平台,在线支持Hologres、GraphComputeTableStore等平台,开发者或者算法工程师无需深入了解各个平台的细节,所有的操作在特征平台中都可以通过网页手动操作或者Python SDK完成,提升团队工作效率,同时也会避免一些可能存在的问题,比如推荐场景中比较常见的离线在线不一致的问题。

目前特征平台已经与EasyRec深度集成,可以非常方便高效地进行FG和模型训练,并且能够直接部署到线上,可以做到在短时间内搭建起一套前沿的推荐系统,并且可以取FG得优良的效果。

如果您在使用过程中有任何问题,请通过搜索钉钉群(32260796)进群咨询。

前提条件

已开通PAI服务并创建PAI工作空间,操作详情请参见开通PAI并创建默认工作空间

准备工作

安装特征平台Python SDK, 要求在Python3环境下运行。本文的代码建议在DSW中运行。

! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl

FeatureStoreClient的建立需要传入您阿里云账户的access_key_idaccess_key_secret,我们建议您通过环境变量方式传入,以降低泄漏风险。进入DSW实例后,您可以单击上方的Terminal,进入终端界面,并运行以下命令:

对于access_key_id,您需要将您的AccessKeyID替换YOUR_AccessKey_ID

echo "export AccessKeyID='YOUR_AccessKey_ID'" >> ~/.bashrc
source ~/.bashrc

对于access_key_secret,您需要将您的AccessKeySecret替换YOUR_Access_Key_Secret

echo "export AccessKeySecret='YOUR_Access_Key_Secret'" >> ~/.bashrc
source ~/.bashrc

导入需要的功能模块。

import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient, build_feature_store_client
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

数据集介绍

数据集示例开源电影数据集Moviedata-10M,其中主要使用的是Movie、UserRating这三份数据,分别对应推荐流程中的物料表、用户表和label表。

配置特征项目

您可以通过特征平台创建多个项目空间,每个项目空间是独立的。具体操作,请参见配置FeatureStore项目。运行notebook需要FeatureStore服务端配合运行,开通特征平台后需要配置数据源,具体操作请参见配置数据源

其中,offline_datasource_id指的是离线数据源ID,online_datasource_id指的是在线数据源ID。

此处以项目名称是fs_movie为例进行说明。

# 输入您阿里云账户的access_key_id
access_id = os.getenv("AccessKeyID")
# 输入您阿里云账户的access_key_secret
access_ak = os.getenv("AccessKeySecret")
# 输入您开通特征平台所在地域,此处以华东1(杭州)为例
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
# 输入您特征平台的项目名,此处以fs_movie为例
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
    raise ValueError("Need to create project : fs_movie")

运行以下代码获取当前的project并打印其信息。

project = fs.get_project(cur_project_name)
project.print_summary()

配置特征实体(FeatureEntity)

特征实体描述了一组相关的特征集合。多个特征视图可以关联一个特征实体。每个实体都会有一个JoinId,通过JoinId可以关联多个特征视图特征。每一个特征视图都有一个主键(索引键)来获取它的特征数据,但是索引键可以和JoinId定义的名称不一样。

参考如下示例,创建Movie、UserRating三个实体。

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
	entity_movie = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
entity_movie.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
  entity_user = project.create_entity(name = cur_entity_name_user, join_id=join_id)
entity_user.print_summary()
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
  entity_ratings = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
entity_ratings.print_summary()

配置特征视图(FeatureView

FeatureStore是一个专门用来管理和组织特征数据的平台,外部数据需要通过特征视图进入FeatureStore。特征视图定义了数据从哪里来(DataSource)、需要进行哪些预处理或转换操作(如特征工程/Transformation)、特征的数据结构(包含特征名称和类型在内的特征schema)、数据存储的位置(OnlineStore/OfflineStore),并提供特征元信息管理,如主键、事件时间、分区键、特征实体以及有效期设定ttl(默认-1表示永久有效,正数则表示在线查询时会取ttl内的最新特征数据)。

特征视图分为三种类型:

  • BatchFeatureView:离线特征,或者T-1天特征。将离线数据注入到FeatureStoreOfflineStore中,并可以根据需求同步至OnlineStore以支持实时查询。一般的离线特征,或者T-1天的特征。

  • StreamFeatureView:实时特征。将数据直接写入OnlineStore,并同时同步到OfflineStore。

  • Sequence FeatureView:序列特征。支持离线写入序列特征,以及查询和读取实时序列特征。

BatchFeatureView

如果数据存在于CSV文件中,通过URL下载写入到MaxCompute,定义的FeatureViewschema需要手动创建。

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

schema定义了字段的名称和类型。

movie_schema = OpenSchema(
    OpenField(name='movie_id', type='STRING'),
    OpenField(name='name', type='STRING'),
    OpenField(name='alias', type='STRING'),
    OpenField(name='actores', type='STRING'),
    OpenField(name='cover', type='STRING'),
    OpenField(name='directors', type='STRING'),
    OpenField(name='double_score', type='STRING'),
    OpenField(name='double_votes', type='STRING'),
    OpenField(name='genres', type='STRING'),
    OpenField(name='imdb_id', type='STRING'),
    OpenField(name='languages', type='STRING'),
    OpenField(name='mins', type='STRING'),
    OpenField(name='official_site', type='STRING'),
    OpenField(name='regions', type='STRING'),
    OpenField(name='release_data', type='STRING'),
    OpenField(name='slug', type='STRING'),
    OpenField(name='story', type='STRING'),
    OpenField(name='tags', type='STRING'),
    OpenField(name='year', type='STRING'),
    OpenField(name='actor_ids', type='STRING'),
    OpenField(name='director_ids', type='STRING'),
    OpenField(name='dt', type='STRING')
)
print(movie_schema)

新建batch_feature_view。

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

数据写入MaxCompute表。

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

查看当前task的信息。

print(cur_task.task_summary)

数据同步到OnlineStore中。

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

获取对应的FeatureView。

batch_feature_view = project.get_feature_view(feature_view_movie_name)

打印该FeatureView的信息。

batch_feature_view.print_summary()

我们按此顺序,依次导入users表,ratings表。

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = OpenSchema(
  OpenField(name='user_md5', type='STRING'),
  OpenField(name='user_nickname', type='STRING'),
  OpenField(name='ds', type='STRING')
)
print(user_schema)
feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = OpenSchema(
  OpenField(name='rating_id', type='STRING'),
  OpenField(name='user_md5', type='STRING'),
  OpenField(name='movie_id', type='STRING'),
  OpenField(name='rating', type='STRING'),
  OpenField(name='rating_time', type='STRING'),
  OpenField(name='dt', type='STRING')
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

label表注册。

label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
  label_table = project.create_label_table(datasource=ds, event_time='rating_time')

配置离线数据源Offlinestore

离线特征数据存储的数据仓库,在MaxComputeDS上的HDFS,通过Spark进行数据写入。通过离线数据源可以生成样本数据TrainingSet,用于模型训练;也可以生成batch prediction数据,用于批量预测。

配置在线数据源(Onlinestore)

在线预测时,需要低延迟获取特征数据,在线数据源提供在线特征数据的存储。目前优先支持Hologres、TablestoreGraphcompute。

获取在线特征

从特征视图的角度获取在线特征,目前优先支持Hologres。

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960', '3317352']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)

配置FeatureSelector

从离线数据源或在线数据源获取特征时,需要明确指出应该获取哪些特征。可以从特征视图的角度选择特征。

feature_view_name = 'feature_view_movie'
# 选择部分特征
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])

#选择全部特征
feature_selector = FeatureSelector(feature_view_name, '*')

# 支持别名
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # 字段别名,最终会产出 f1_1 的字段名称 
)

配置样本表(TrainingSet

训练模型时,首先要构造样本表,样本表由Label数据和特征数据组成。在与FeatureStore交互时,Label数据需要由客户提供,并且需要定义要获取的特征名称,然后根据主键进行point-in-time join(存在event_time的情况下)。

label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

训练模型(Model

训练模型并部署成服务后,进行业务预测。其中,训练样本可以从上文的train_set获得。

model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

导出样本表

实际训练时,需要导出样本表。指定Label表以及各个特征视图的分区、event_time。

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')

movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)

user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

根据指定的条件,导出样本表。

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print(task.summary)