目前在特征平台(FeatureStore)中支持的特征生产功能在推荐、广告、风控以及机器学习等领域都有广泛的应用。该功能旨在降低特征生产的复杂度,通过将特征生产中通用常见的功能固定下来,通过配置的方式即可实现特征生产。本文为您介绍特征生产的详细过程。
前提条件
在开始执行操作前,请确认您已完成以下准备工作:
依赖产品 | 具体操作 |
人工智能平台PAI |
|
云原生大数据计算服务MaxCompute |
|
大数据开发治理平台DataWorks |
|
一、准备工作
准备原始数据
本文需要用到的四张原始表分别为:
用户表(rec_sln_demo_user_table_preprocess_v1):包含一些基础的用户特征,例如性别、年龄、城市和关注数等。
行为表(rec_sln_demo_behavior_table_preprocess_v1):包含一些行为特征,例如某时用户点击某个物品等。
物品表(rec_sln_demo_item_table_preprocess_v1):包含一些基础的物品特征,例如类别、作者、累计点击数和累计点赞数等。
行为宽表(rec_sln_demo_behavior_table_preprocess_wide_v1):该表由前三张表连接而成。
数据表存放在有公开读取权限的pai_online_project中,其数据均为模拟数据生成。您需要在DataWorks中执行SQL命令,将上表数据从pai_online_project项目同步到您的MaxCompute项目中。具体操作步骤如下:
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute > ODPS SQL,在弹出的页面中配置节点参数。
参数
取值建议
引擎实例
选择已创建的MaxCompute引擎。
节点类型
ODPS SQL
路径
业务流程/Workflow/MaxCompute
名称
可自定义名称。
单击确认。
在新建节点区域运行以下SQL命令,将用户表、物品表、行为表以及行为宽表数据从pai_online_project项目同步到您的MaxCompute项目中。资源组选择已创建的独享资源组。
同步用户表:rec_sln_demo_user_table_preprocess_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_v1 like pai_online_project.rec_sln_demo_user_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_user_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';
同步行为表:rec_sln_demo_behavior_table_preprocess_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v1 like pai_online_project.rec_sln_demo_behavior_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';
同步物品表:rec_sln_demo_item_table_preprocess_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_v1 like pai_online_project.rec_sln_demo_item_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_item_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';
同步行为宽表:rec_sln_demo_behavior_table_preprocess_wide_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_wide_v1 like pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_wide_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1 WHERE ds >= '20240530' and ds <='20240605';
安装FeatureStore Python SDK
以下代码均建议在Jupyter Notebook环境下运行。
安装特征平台Python SDK,要求在Python3环境下运行。
%pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.8.0-py3-none-any.whl
导入需要的功能模块:
import os from feature_store_py import FeatureStoreClient from feature_store_py.fs_datasource import MaxComputeDataSource from feature_store_py.feature_engineering import TableTransform, Condition, DayOf, ComboTransform, Feature, AggregationTransform, auto_count_feature_transform, WindowTransform, auto_window_feature_transform
二、表变换和特征变换操作流程
以下代码均建议在Jupyter Notebook环境下运行。
定义表变换。
初始化Client。
access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID") # 填入您的Access Key ID access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET") # 填入您的Access Key Secret project = 'project_name' # 填入您的项目名 region = 'cn-hangzhou' # 填入您的项目所在区域,例如华东1(杭州)为cn-hangzhou fs_client = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region=region)
指定数据源。
input_table_name = "rec_sln_demo_behavior_table_preprocess_v1" ds = MaxComputeDataSource(table=input_table_name, project=project)
指定变换后的输出表名称。
output_table_name = "rec_sln_demo_v1_fs_test_v1"
定义表变换。
trans_name = "drop_duplicates" # 表变换名称 keys = ["user_id", "item_id"] # 去重字段 sort_keys = ["event_unix_time"] # 排序字段 sort_order = ["desc"] # 顺序定义 tran_i = TableTransform(trans_name, keys, sort_keys, sort_order)
定义特征变换。
feature1 = Feature( name="page_net_type", input=['page', 'net_type'], transform=ComboTransform( separator='_' ) ) feature2 = Feature( name="trim_playtime", type="double", transform="playtime/10" )
生成pipeline流程。
pipeline = fs_client.create_pipeline(ds, output_table_name).add_table_transform(tran_i).add_feature_transform([feature1, feature2], keep_input_columns=True)
生成与执行变换。
execute_date = '20240605' output_table = pipeline.execute(execute_date, drop_table=True)
执行上述代码共分以下两个步骤:
生成变换配置。该配置除了指定SQL外,还指定了变换所需的各种信息,包括输入输出、参数和依赖等。
执行。根据第一个步骤生成的配置执行,将变换后的结果存放在输出表中。
查看结果。
查看生成表的结果,该结果直接以pandas.DataFrame的形式呈现。
pd_ret = output_table.to_pandas(execute_date, limit=20)
展示pd_ret的内容。
pd_ret
查看生成的配置,该配置包含输入表定义、变换SQL、依赖、参数以及输出表定义等。该配置既适用于Debug调试,也适用于保存后进行后续的上线例行任务等。
transform_info = output_table.transform_info
查看transform_info的内容。
transform_info
查看第一阶段的输入配置。
pipeline_config = pipeline.pipeline_config
查看pipeline_config的内容。
pipeline_config
三、统计类型特征变换
统计特征普遍存在于特征生产场景中,是机器学习和数据分析中常见的一种数据预处理方法,通常用于生成更具代表性和更容易解释的特征。这些变换通过对原始数据进行汇总、计算和抽取,使得模型能够更好地理解数据的时间趋势、周期性和异常情况。具体优势如下:
捕捉时间趋势:在用户行为数据中,最近一段时间的行为往往对当前状态有更大的影响。
降低噪音:原始数据中可能包含大量噪音。通过统计变换,您可以通过聚合操作来减少这些噪音的影响。
丰富特征:统计变换可以生成新特征,增加模型的表达能力。
提高模型性能:通过引入统计特征,您可以显著提高模型的预测性能。
增强解释性:统计特征更加易于解释和理解,使得问题的诊断和分析更为方便。
数据压缩:在某些情况下,统计特征可以有效减少数据维度。
虽然统计特征的具体实现过程较为复杂,但通过如下介绍的简单的统计特征定义,就可以生产出大量的统计特征。
单个统计类型特征变化定义与运行
定义输入和输出表名。
input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project) output_agg_table_name = "rec_sln_demo_behavior_test_agg_v1"
定义一个统计类型特征。
feature_agg1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", # 聚合函数名称,可选的有 'avg', 'sum', 'min', 'max' condition=Condition(field="event", value="expr", operator="<>"), # 设置条件判断,表示当字段 "event" 的值不等于 "expr" 时成立。具体逻辑可结合生成的SQL语句来进一步理解。 group_by_keys="user_id", # group by 对应的 key window_size=DayOf(1), # 窗口大小,这里是 1 天 ), )
创建pipeline,运行统计类型特征变换。
agg_pipeline = fs_client.create_pipeline(ds_agg, output_agg_table_name).add_feature_transform([feature_agg1])
生产与执行变换。
execute_date = '20240605' print("transform_info = ", agg_pipeline.transform_info) output_agg_table = agg_pipeline.execute(execute_date, drop_table=True)
查看transform_info内容。
transform_info_agg = output_agg_table.transform_info transform_info_agg
查看结果。
pd_ret = output_agg_table.to_pandas(execute_date, limit=20) pd_ret
多个不同窗口统计类型特征变换自动JOIN
定义输出表名。
output_agg_table_name_2 = "rec_sln_demo_behavior_test_agg_v2"
定义多个不同窗口的统计类型特征,定义的窗口大小分为别1、3、7、15和30。
feature_agg1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg2 = Feature( name="user_avg_praise_count_3d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(3), ), ) feature_agg3 = Feature( name="user_avg_praise_count_7d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(7), ), ) feature_agg4 = Feature( name="user_avg_praise_count_15d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(15), ), ) feature_agg5 = Feature( name="user_avg_praise_count_30d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(30), ), )
创建pipeline。
agg_pipeline_2 = fs_client.create_pipeline(ds_agg, output_agg_table_name_2).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5])
执行pipeline生产过程。
execute_date = '20240605' output_agg_table_2 = agg_pipeline_2.execute(execute_date, drop_table=True)
查看变换结果。
transform_info_agg_2 = output_agg_table_2.transform_info transform_info_agg_2
查看表运行结果。
pd_ret_2 = output_agg_table_2.to_pandas(execute_date, limit=20) pd_ret_2
多个统计特征变换过程支持自动归并和类型自动推导
为了优化计算过程,多个特征同窗口大小时会自动归并,在同一个group窗口块中计算完成。 整个计算过程会涉及类型的改变,例如avg会将bigint类型变换为double类型,而且输入特征的类型众多比较难记住。因此在整个统计特征的变换过程中,会支持类型自动推导,即不需要预先指定类型,特征定义过程中会自动推导出结果特征的类型。
定义输出表名。
output_agg_table_name_3 = "rec_sln_demo_behavior_test_agg_v3"
定义更多不同类型的特征。
feature_agg6 = Feature( name="user_expr_cnt_1d", transform=AggregationTransform( agg_func="count", condition=Condition(field="event", value="expr", operator="="), group_by_keys="user_id", window_size=DayOf(1), ) ) feature_agg7 = Feature( name="user_expr_item_id_dcnt_1d", input=['item_id'], transform=AggregationTransform( agg_func="count", condition=Condition(field="event", value="expr", operator="="), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg8 = Feature( name="user_sum_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg9 = Feature( name="user_sum_praise_count_3d", input=["praise_count"], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(3), ), )
创建pipeline。
agg_pipeline_3 = fs_client.create_pipeline(ds_agg, output_agg_table_name_3).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5, feature_agg6, feature_agg7, feature_agg8, feature_agg9])
执行pipeline生产过程。
execute_date = '20240605' output_agg_table_3 = agg_pipeline_3.execute(execute_date, drop_table=True)
查看变换结果。
transform_info_agg_3 = output_agg_table_3.transform_info transform_info_agg_3
查看表运行结果。
pd_ret_3 = output_agg_table_3.to_pandas(execute_date, limit=20) pd_ret_3
内置自动扩展函数,支持对统计特征变换自动扩展
因统计特征较多,包括不同的窗口大小和众多聚合函数计算组合,手动实现每个特征比较复杂。系统内置了自动扩展函数,仅需指定要统计的输入特征,即可自动生成并完成数百种统计特征的定义。
指定要统计的输入特征。
name_prefix = "user_" input_list = ["playtime", "duration", "click_count", "praise_count"] event_name = 'event' event_type = 'expr' group_by_key = "user_id" count_feature_list = auto_count_feature_transform(name_prefix, input_list, event_name, event_type, group_by_key) print("len_count_feature_list = ", len(count_feature_list)) print("count_feature_list = ", count_feature_list)
定义输出表名,并创建pipeline。
output_agg_table_name_4 = "rec_sln_demo_behavior_test_agg_v4" agg_pipeline_4 =fs_client.create_pipeline(ds_agg, output_agg_table_name_4).add_feature_transform(count_feature_list)
执行pipeline生产过程。
execute_date = '20240605' output_agg_table_4 = agg_pipeline_4.execute(execute_date, drop_table=True)
查看变换结果。
transform_info_agg_4 = output_agg_table_4.transform_info transform_info_agg_4
查看表运行结果。
pd_ret_4 = output_agg_table_4.to_pandas(execute_date, limit=20) pd_ret_4
支持不同的group key同时变换
以上内容阐述了当所有group key均相同时的处理方法,此外,系统还支持对不同group key进行转换操作。具体示例如下:
定义输出表名。
input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project) output_agg_table_name_5 = "rec_sln_demo_behavior_test_agg_v5"
定义不同group key的特征。
feature_agg1 = Feature( name="item__sum_follow_cnt_15d", input=['follow_cnt'], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="="), group_by_keys="item_id", window_size=DayOf(1), ) ) feature_agg2 = Feature( name="author__max_follow_cnt_15d", input=['follow_cnt'], transform=AggregationTransform( agg_func="max", condition=Condition(field="event", value="expr", operator="="), group_by_keys="author", window_size=DayOf(15), ), )
创建pipeline。
agg_pipeline_5 = fs_client.create_pipeline(ds_agg, output_agg_table_name_5).add_feature_transform([feature_agg1, feature_agg2])
执行pipeline生产过程。
execute_date = '20240605' output_agg_table_5 = agg_pipeline_5.execute(execute_date, drop_table=True)
查看变换结果。
transform_info_agg_5 = output_agg_table_5.transform_info transform_info_agg_5
四、WindowTransform窗口变换特征
上述统计类型特征变换基本能满足常规特征生产场景。但在某些大规模推荐等场景中,还会面临更高层次要求。FeatureStore支持WindowTransform窗口变换特征,您可以方便地得到KV特征,还可以利用天级中间表的形式来优化计算过程,进而达到降低特征计算时间、节省计算成本的目的。具体优势如下:
捕捉复杂非线性交互:简单的特征(如用户的年龄、性别等)难以表达用户的复杂偏好。特征交叉可以帮助捕捉用户和物品之间更复杂的非线性交互关系。
提升预测准确性:交叉特征可以显著提升推荐系统和广告系统的性能。
减少存储空间:对于大规模的用户和物品集合,直接存储每一对用户和物品的交互特征是不现实的。特征抽取和特征转换能有效减少需要存储的特征数量。
提升推理效率:通过预先计算并存储交叉特征,在实时推理时可以快速查找并利用这些特征,从而提升系统响应速度。
按照以下几个维度为您介绍WindowTransform窗口变换特征的实现过程:
简单聚合函数计算过程
简单聚合函数包括count、sum、max和min 。这些聚合函数的计算过程比较直接,即通过天级别汇总后,再进一步进行多天的汇总即可得到最终的结果。此外,本文还将介绍天级中间表,以及引入了UDF(MaxCompute UDF概述)以得到最终的计算结果。在实际的计算过程中,用户执行特征生产的过程还是和上述常规特征变换一样,Feature Store Python SDK会自动处理天级中间表的创建、UDF的生成、资源上传以及函数自动注册等,用户无需感知这些细节即可实现特征生产过程。
定义输入和输出表名。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_agg_table_name, project=project) output_window_table_name_1 = "rec_sln_demo_behavior_test_window_v1"
定义WindowTransform特征。
win_feature1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_cnt_7d", input=["gender"], transform=WindowTransform( agg_func="sum", # 聚合函数,可选的有 'sum', 'avg', 'max', 'min' agg_field="click_count", # 对该特征进行聚合函数的计算 condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), )
创建pipeline,运行WindowTransform类型特征变换。
window_pipeline_1 = fs_client.create_pipeline(ds_window_1, output_window_table_name_1).add_feature_transform([win_feature1, win_feature2], keep_input_columns=True)
生产与执行变换。
该生产过程有天级临时中间表生成,此时的drop table只会删掉最终的结果,不会删除中间的临时表。
execute_date = '20240605' print("transform_info = ", window_pipeline_1.transform_info) output_window_table_1 = window_pipeline_1.execute(execute_date, drop_table=True)
另外,因涉及多天的统计,例如上述示例是统计7天的数据,中间的临时表一般只会计算最新分区的数据。为此系统在执行特征生产过程中提供了一个参数 backfill_partitions,当第一次执行时可以将其设置为True,系统会自动补全依赖的天数。例如本次统计是涉及7天的数据,系统会自动将7天的数据补全,这样在后面例行运行时,还可以再设置为False,只需要补全最新一天分区的数据即可。
execute_date = '20240506' output_window_table_1 = window_pipeline_1.execute(execute_date, backfill_partitions=True)
当设置backfill_partitions参数为True时,系统会自动补全临时中间表的依赖天数的数据,建议在第一次例行运行时运行。
当统计天数较多时,运行上述代码的时间较长。
查看结果表数据。
window_ret_1 = output_window_table_1.to_pandas(execute_date, limit=50) window_ret_1
查看实际的计算过程。
window_pipeline_1.transform_info
通过计算过程可以看出,系统会生成天级别的中间临时表 rec_sln_demo_behavior_table_preprocess_wide_v1_tmp_daily ,该表会将天级的结果汇总存储在固定的分区中,这样会避免重复计算。
另外在计算最终结果时,还会使用到一个UDF:count_kv,该UDF会自动将统计结果分类汇总到结果map中,以String的形式存在,方便后续进行离线和在线结果推理。
上述内容介绍了简单聚合函数计算过程,以count、sum为例执行了整个生产过程,该生产过程虽然涉及天级的中间临时表、UDF等概念,但核心流程和常规的数据变换操作一样,未增加操作复杂度。其它简单聚合函数max和min同理。
avg聚合函数计算过程
因为在一天中计算avg结果后,再通过多天来汇总结果,会导致计算结果不准确,因此单独介绍avg聚合函数计算过程。正确的计算方法是先计算多天的 sum_v 累加值和 count_v 计数结果,然后再通过sum_v/count_v
得出多天的avg值。
尽管该聚合函数被单独介绍,其复杂的计算细节已被封装在transform_info
中。因此,用户无须深入了解这些细节,而只需像处理常规特征那样进行操作,即可顺利生成最终结果。
定义输入和输出表名。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_2 = "rec_sln_demo_behavior_test_window_v2"
定义WindowTransform特征。
win_feature1 = Feature( name="item__kv_gender_click_avg_7d", input=["gender"], transform=WindowTransform( agg_func="avg", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_avg_15d", input=["gender"], transform=WindowTransform( agg_func="avg", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(15), ), )
创建pipeline,运行WindowTransform类型特征变换。
window_pipeline_2 = fs_client.create_pipeline(ds_window_1, output_window_table_name_2).add_feature_transform([win_feature1, win_feature2])
生产与执行变换。
execute_date = '20240605' print("transform_info = ", window_pipeline_2.transform_info) output_window_table_2 = window_pipeline_2.execute(execute_date, drop_table=True)
查看结果表数据。
window_ret_2 = output_window_table_2.to_pandas(execute_date, limit=50) window_ret_2
多种group key的函数计算过程
同样,WindowTransform支持多种group key来同时计算,结果最终left join在输入表中。具体示例如下所示:
定义输入和输出表名。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_3 = "rec_sln_demo_behavior_test_window_v3"
定义WindowTransform特征。
win_feature1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_cnt_7d", input=["gender"], transform=WindowTransform( agg_func="sum", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature3 = Feature( name="author__kv_gender_click_15d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="author", window_size=DayOf(7), ), ) win_feature4 = Feature( name="author__kv_gender_click_cnt_15d", input=["gender"], transform=WindowTransform( agg_func="sum", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="author", window_size=DayOf(7), ), )
创建pipeline,运行WindowTransform类型特征变换。
window_pipeline_3 = fs_client.create_pipeline(ds_window_1, output_window_table_name_3).add_feature_transform([win_feature1, win_feature2, win_feature3, win_feature4])
生产与执行变换。
execute_date = '20240605' print("transform_info = ", window_pipeline_3.transform_info) output_window_table_3 = window_pipeline_3.execute(execute_date, drop_table=True)
查看结果表数据。
window_ret_3 = output_window_table_3.to_pandas(execute_date, limit=50) window_ret_3
内置自动扩展函数,支持对WindowTransform特征自动扩展
和统计类型特征变换类似,WindowTransform统计特征较多,包括不同的窗口大小和众多聚合函数计算组合,手动实现每个特征比较复杂。系统内置了自动扩展函数,仅需指定要统计的输入特征,即可自动生成并完成数百种统计特征的定义。
指定要统计的输入特征。
name_prefix = "item" input_list = ['gender'] agg_field = ["click_count"] event_name = 'event' event_type = 'click' group_by_key = "item_id" window_size = [7, 15, 30, 45] window_transform_feature_list = auto_window_feature_transform(name_prefix, input_list, agg_field, event_name, event_type, group_by_key, window_size) print("len_window_transform_feature_list = ", len(window_transform_feature_list)) print("window_transform_feature_list = ", window_transform_feature_list)
定义输出表名,创建pipeline。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_4 = "rec_sln_demo_behavior_test_window_v4" window_pipeline_4 =fs_client.create_pipeline(ds_window_1, output_window_table_name_4).add_feature_transform(window_transform_feature_list)
生产与执行变换。
execute_date = '20240605' print("transform_info = ", window_pipeline_4.transform_info) output_window_table_4 = window_pipeline_4.execute(execute_date, drop_table=True)
JoinTransform联接转换
上述特征生产过程,尤其是AggregationTransform和WindowTransform的输入均为行为表,输出结果也是存放于一个行为表中。而在大部分情况下,最终上线需要的不是一个行为表,而是要和别的表进行连接,生成特征表,例如user表或item表。
因此引入joinTransform,支持将AggregationTransform和WindowTransform的特征和已有的user表或item 表连接起来。
JoinTransform和WindowTransform进行关联
定义WindowTransform输入表。
window_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1' ds_window_1 = MaxComputeDataSource(table=window_table_name, project=project)
定义WindowTransform特征。
win_fea1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ) )
创建pipeline。
说明因后续需要join其他表,此处未指定输出表。
win_pipeline_1 = fs_client.create_pipeline(ds_window_1).add_feature_transform([win_fea1])
定义JoinTransform输入和输出表。
item_table_name = 'rec_sln_demo_item_table_preprocess_v1' ds_join_1 = MaxComputeDataSource(table=item_table_name, project=project) output_table_name = 'rec_sln_demo_item_table_v1_fs_window_debug_v1'
创建JoinTransform pipeline,和Window Transform pipeline连接在一起。
join_pipeline_1 = fs_client.create_pipeline(ds_join_1, output_table_name).merge(win_pipeline_1)
生产和执行变换。
execute_date = '20240605' output_join_table_1 = join_pipeline_1.execute(execute_date, drop_table=True)
查看结果表数据。
join_ret_1 = output_join_table_1.to_pandas(execute_date, limit = 50) join_ret_1
查看实际的计算过程。
output_join_table_1.transform_info
JoinTransform和AggregationTransform进行关联
定义AggregationTransform输入表。
agg_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1' ds_agg_1 = MaxComputeDataSource(table=agg_table_name, project=project)
定义AggregationTransform特征。
agg_fea1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), )
创建pipeline。
说明因后续需要join其他表,此处未指定输出表。
agg_pipeline_1 = fs_client.create_pipeline(ds_agg_1).add_feature_transform([agg_fea1])
定义JoinTransform输入和输出表。
user_table_name = 'rec_sln_demo_user_table_preprocess_v1' ds_join_2 = MaxComputeDataSource(table=user_table_name, project=project) output_table_name_2 = 'rec_sln_demo_user_table_v1_fs_window_debug_v1'
创建JoinTransform pipeline,和AggregationTransform连接在一起。
join_pipeline_2 = fs_client.create_pipeline(ds_join_2, output_table_name_2).merge(agg_pipeline_1, keep_input_columns=False)
生产和执行变换。
execute_date = '20240605' output_join_table_2 = join_pipeline_2.execute(execute_date, drop_table=True)
查看结果表数据。
join_ret_2 = output_join_table_2.to_pandas(execute_date, limit = 50) join_ret_2
查看实际的计算过程。
output_join_table_2.transform_info
相关文档
参考特征生产最佳实践,了解实际应用场景。
阿里云特征平台(FeatureStore)基本适用于所有需要特征的场景,比如推荐场景、金融风控场景、用户增长场景。同时,FeatureStore与阿里云常用数据源引擎、推荐服务引擎完成对接,可为您提供端到端高效便捷的一站式从特征注册管理到模型开发应用的全流程操作平台。更多关于FeatureStore的信息,请参见FeatureStore概述。
如果您在配置或使用过程中有任何问题,可以搜索钉钉群号:34415007523,进入答疑群联系技术人员进行咨询。