模拟实时统计特征的配置和原理

本文介绍实时统计特征如何构建,有哪些注意点。

背景信息

推荐系统的召回和排序,越能实时、准确地根据用户的操作感知到用户的偏好行为,根据操作行为来召回候选集和排序候选的物品,推荐的效果就越好。但是我们在优化一个推荐场景的召回或者排序模型的时候,往往还没有准备好用户和物品的实时特征,无法在模型打分前1秒获取到用户实时特征和物品实时特征。

如果我们先建设一个实时统计特征、写入到系统中、再根据曝光、点击等事件来关联正负样本,这个流程非常长,还可能开发了一些没有用的实时特征从而浪费开发资源和时间。我们根据长期的推荐系统实践,提出利用离线历史行为数据、用户和物品特征数据来模拟计算出实时特征,得到特征和训练样本,在线上服务的时候用Flink统计真实的实时特征供模型打分使用,充分使用历史数据跳过实时特征积累的过程,能够离线先验证模型效果,同时也大大缩短了项目时间(先用离线模拟的实时特征来训练模型,用真实实时特征来做线上服务,最终再用真实实时特征来训练模型)。

准备数据

1.离线和实时的用户行为日志表

除了离线的行为日志表之外,还需要一张实时行为日志表。实时行为日志表首先存在DataHub中,其余消息中间件也可以,只需要能被Flink SQL消费即可。注意:实时行为日志表的字段是离线行为表字段的子集(即DataHub和MaxCompute中的表一些字段名称是一致的),但至少需要以下字段。

字段

类型

是否必须非空

描述

user_id

string

用户id

item_id

string

商品id

request_id

string

请求id

event

string

行为类型,该字段的值一般是英文字符串:expr,click,buy等

event_time

bigint

行为发生的时间,必须是10位的时间戳,单位是秒

event_value

double

行为对应的值,如观看时长,购买金额等;如果没有则设置为0

scene

string

用来区分日志场景,如首页猜你喜欢推荐、搜索场景、详情页相关推荐、排行榜等等

字段名称不一定和以上描述完全一样,可以增加冗余字段。

我们准备如下用户行为日志表。MaxCompute离线行为日志表结构如下(其中playtime对应上面表格中的event_value字段,page对应上面的scene字段):

image

离线数据表参考《推荐算法定制最佳实践文档》

DataHub实时行为日志表结构如下:

image

可以看到实时行为表字段和离线行为表完全一样,且字段是离线行为表的子集。

PAI-Rec推荐方案定制->数据注册->DataHub表中进行实时行为表的注册,同时在MaxCompute表中对离线行为表注册

image

2.用户属性表

MaxCompute准备如下表结构:

image并在PAI-Rec中的推荐算法定制中进行MaxCompute进行数据注册

image

3.物品表

在MaxCompute中准备如下物品表,同上面一样在PAI-Rec的推荐算法定制中进行数据注册

imageimage

推荐算法定制

我们在PAI-Rec控制台的推荐算法定制的特征工程功能模块中集成了如何配置实时统计特征的功能,配置实时统计特征之后,会产出MaxCompute SQL离线模拟产出推荐请求那一刻需要的实时统计特征。

1.数据表配置

行为日志表配置

image

image

用户表配置

image

物品表配置

image

实时行为日志表配置

image

image

2.离线模拟实时统计特征

在算法定制的特征配置中,对用户和物品实时特征如下配置。尽可能实时特征不要配置曝光行为,否则会导致Flink的资源消耗过大,统计点击以上的行为基本上能取得同样的收益,特殊场景除外。

用户实时特征

注意:防穿越时间设置5秒,表示只统计在某次推荐请求5秒之前的行为。防止离线可以统计到,但是在线服务中因为系统延迟、转发等逻辑并不能统计到这个特征。

image

物品实时特征

image

离线模拟实时特征SQL如下:

image

当把推荐算法定制生成的代码部署到DataWorks上面之后,可以在DataWorks上的“数据开发-业务流程中”看到如上节点,分别构造了用户的实时特征和物品的实时特征。

首先构造了ali_rec_wide_table_for_rt的表,该表为模拟实时特征准备。在模拟一天数据的实时特征过程中至少需要前一天和当天的数据,因为当天凌晨的一些用户和物品的实时特征需要利用前一天的数据来模拟统计出实时特征,如上图所示ali_rec_user_id_rt_feature,ali_rec_item_id_rt_feature节点是根据离线日志模拟实时特征的核心

以ali_rec_user_id_rt_feature为例,做以下讲解

REATE TABLE IF NOT EXISTS ali_rec_user_id_rt_feature 
(
    user_id string
    ,request_id string
    ,event_unix_time bigint
    ,user__cnt_click_all_scene_rt5m bigint
    ,user__kv_author_click_all_scene_rt5m string
    ,user__kv_category_click_all_scene_rt5m string
    ,user__cnt_praise_all_scene_rt5m bigint
    ,user__kv_author_praise_all_scene_rt5m string
    ,user__kv_category_praise_all_scene_rt5m string
    ,user__cnt_click_all_scene_rt1h bigint
    ,user__kv_author_click_all_scene_rt1h string
    ,user__kv_category_click_all_scene_rt1h string
    ,user__cnt_praise_all_scene_rt1h bigint
    ,user__kv_author_praise_all_scene_rt1h string
    ,user__kv_category_praise_all_scene_rt1h string
    ,user__cnt_click_all_scene_rt12h bigint
    ,user__kv_author_click_all_scene_rt12h string
    ,user__kv_category_click_all_scene_rt12h string
    ,user__cnt_praise_all_scene_rt12h bigint
    ,user__kv_author_praise_all_scene_rt12h string
    ,user__kv_category_praise_all_scene_rt12h string
)
PARTITIONED BY 
(
    ds string
)
LIFECYCLE 90
;
set odps.stage.mapper.split.size=8;
set odps.stage.reducer.num=200;
set odps.namespace.schema=false;
--@resource_reference{"feature-generate-mr-v1.16.jar"}
jar -resources feature-generate-mr-v1.16.jar
    -classpath feature-generate-mr-v1.16.jar
    com.aliyun.pai.feature_generate_mr.RealTimeFeature
    -pid "user_id"
    -pid_type "string"
    -time_stamp "event_unix_time"
    -time_stamp_type "bigint"
    -event "event"
    -valid_events "click,praise"
    -valid_event_selections "click,praise"
    -category_fields "author,category"
    -default_v ""
    -lag_time_values "300,3600,43200"
    -pre_seconds 5
    -topk 100
    -output_date "${bdp.system.bizdate}"
    -input_table "ali_rec_wide_table_for_rt/ds='${bdp.system.bizdate}'"
    -output_table "ali_rec_user_id_rt_feature/ds='${bdp.system.bizdate}'"
    -requestid "request_id"
;

该jar包是根据离线日志模拟实时特征,参数介绍如下:

-pid                        一般是userid,表示统计谁的实时特征,也可以是itemid
-pid_type                   pid的数据类型,string或者bigint
-requestid                  行为表的请求id字段,可以不配置,则每秒输出一次结果,如果有,则每个用户,每个requestid计算输出一次实时特征
-time_stamp                 行为表的行为发生时间,10位时间戳,单位是秒
-time_stamp_type            行为表的时间戳的数据类型,类型是bigint
-event                      行为表的事件类型字段
-valid_events               行为表对事件类型的别名
-valid_event_selections     行为表需要被统计的事件类型,一个事件类型可以包含多个具体行为名,例如 click 可以包含 discover_click 和 popular_click, 输入时以 "|" 分隔
-category_fields            需要被统计的category或者tags的字段
-default_v                  被统计的category或者tags的字段空值填充
-lag_time_values            窗口值,单位秒,例如"1800,3600,10800"
-pre_seconds                多少秒之前发生的行为才可以统计,防止穿越用的:因为线上日志有延迟,离线模拟不能过于及时。建议至少设置5秒。
-topk                       一般表示kv里面k的数量
-output_date                "${bdp.system.bizdate}",表示输出的开始时间,一般输出当前每个时刻的实时特征
-input_table                "dwd_rt_wide_table/dt='${bdp.system.bizdate}'"
-output_table               "dwd_user_rt_feature/dt='${bdp.system.bizdate}'"
-scene                      场景字段,配合valid_scenes使用
-valid_scenes               需要单独统计场景字段的值全部场景统计,单独场景统计

生成的特征规律如下,除了userid,requestid,以及event time字段,以下输出是生成特征的规律

for sc in [all]+valid_scenes: # 遍历场景,默认是全场景,all标识全场景
  for win in windows: # 遍历窗口
    for event in valid_events: # 遍历行为类型
      print(cnt_event) # 行为次数统计
      for c in enumurate(category_fields): #遍历类目字段
        print(kv_c_event) # 类目c在行为event下统计的kv字段

实时特征范围

1.行为计数统计,周期内可以统计不同行为发生的不同次数,如曝光多少次,点击多少次,都是数值特征

2.行为转换率统计,如点击次数/曝光次数,此种是在第一种结果上做了两两相除

3.偏好属性行为计数,如周期内统计不同行为,对应的属性偏好次数,如周期内,用户对不同类目的点击次数,统计得到kv特征

4.偏好属性行为转换率,这也是在得到偏好属性行为计数后做两两相除得到

样本根据userid和requestid join到实时特征便可以进行训练。物品侧同理

实时统计特征在线统计,供给推理使用

实时特征统计实现:

实时特征实现主要依据2个关键函数得实现.

MessageDelay:日志立即输出一次,当水位到达日志在对应的窗口后再各输出一次;如以下讲解中有3个窗口,会直接输出一次,当水位达到当前日志发生时间的300s之后再输出一次,3600s之后再输出一次,43200s之后再输出一次。

SWCountCatesKVS:根据以上输出日志统计实时特征。

以此实现实时特征秒级变动。

配置依赖

如果在该算法定制的环境配置中配置有flink数据源,则会生成flink统计当前实时特征的sql

image

打开flink对应空间,在作业运维中可以看到实时特征统计的sql脚本

image

脚本sql如下

-- please register flink function from https://easyrec.oss-cn-beijing.aliyuncs.com/deploy/real_time_seq_feature/feature-generate-flink-udf-v1.3.jar

CREATE TEMPORARY TABLE rec_sln_demo_behavior_table_dh
(
  user_id     string
  ,item_id    string
  ,event_time bigint
  ,event      string
  ,playtime   double
  ,page       string
)
WITH (
  'connector' = 'datahub'
  ,'subId' = '{topic_subid}'
  ,'endPoint' = '{datahub_endpoint}'
  ,'project' = '{datahub_project}'
  ,'topic' = 'rec_sln_demo_behavior_table_dh'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
)
;

CREATE TEMPORARY TABLE ali_rec_rec_sln_demo_user_table_preprocess
(
  user_id                 string
  ,gender                 string
  ,age                    bigint
  ,city                   string
  ,item_cnt               bigint
  ,follow_cnt             bigint
  ,follower_cnt           bigint
  ,tags                   string
  ,user_register_time     bigint
  ,user_register_time_bin string
  ,PRIMARY KEY (user_id)
)
WITH (
  'connector' = 'odps'
  ,'endPoint' = '{odps_endpoint}'
  ,'project' = 'pairec_demo'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
  ,'tableName' = 'ali_rec_rec_sln_demo_user_table_preprocess'
  ,'partition' = 'max_pt_with_done()'
  ,'cache' = 'ALL'
  ,'cacheSize' = '10000000'
  ,'cacheTTLMs' = '3600000'
)
;

CREATE TEMPORARY TABLE ali_rec_rec_sln_demo_item_table_preprocess
(
  item_id            string
  ,duration          double
  ,category          string
  ,author            bigint
  ,click_count       bigint
  ,praise_count      bigint
  ,item_pub_time     bigint
  ,item_pub_time_bin string
  ,PRIMARY KEY (item_id)
)
WITH (
  'connector' = 'odps'
  ,'endPoint' = '{odps_endpoint}'
  ,'project' = 'pairec_demo'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
  ,'tableName' = 'ali_rec_rec_sln_demo_item_table_preprocess'
  ,'partition' = 'max_pt_with_done()'
  ,'cache' = 'ALL'
  ,'cacheSize' = '10000000'
  ,'cacheTTLMs' = '3600000'
)
;

CREATE TEMPORARY VIEW rec_sln_demo_behavior_table_dh_wide
AS SELECT
  user_id
  ,item_id
  ,event_time
  ,event
  ,playtime
  ,page
  ,gender
  ,age
  ,city
  ,item_cnt
  ,follow_cnt
  ,follower_cnt
  ,tags
  ,user_register_time
  ,user_register_time_bin
  ,duration
  ,category
  ,author
  ,click_count
  ,praise_count
  ,item_pub_time
  ,item_pub_time_bin
  ,HashBucket(user_id, 2) group_key
FROM rec_sln_demo_behavior_table_dh t1
  LEFT JOIN ali_rec_rec_sln_demo_user_table_preprocess FOR SYSTEM_TIME AS OF PROCTIME() t2
    ON t1.user_id = t2.user_id
  LEFT JOIN ali_rec_rec_sln_demo_item_table_preprocess FOR SYSTEM_TIME AS OF PROCTIME() t3
    ON t1.item_id = t3.item_id
;

CREATE TEMPORARY VIEW rec_sln_demo_behavior_table_dh_wide_delay
AS SELECT
  delay_info[1] user_id
  ,delay_info[2] item_id
  ,delay_info[3] event
  ,delay_info[4] page
  ,delay_info[5] gender
  ,delay_info[6] city
  ,delay_info[7] tags
  ,delay_info[8] author
  ,delay_info[9] category
  ,CAST(delay_info[10] AS BIGINT) event_time
  ,CAST(delay_info[13] AS BIGINT) sys_time
  ,CAST(delay_info[14] AS INT) delay_flag
FROM (
  SELECT
    MessageDelay(
      ARRAY[user_id,item_id,event,page,gender,city,tags,CAST(author AS STRING),category]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,group_key
      ,IF(event IN ('click','praise'), true, false)
      ,ARRAY['']
    ) delay_info
  FROM rec_sln_demo_behavior_table_dh_wide
  GROUP BY
    group_key
) sq0
;

CREATE TEMPORARY VIEW dwd_item_id_rt_feature
AS SELECT
  item_id
  ,sys_time
  ,CAST(features[1] AS BIGINT) item__cnt_click_rt5m
  ,features[2] item__kv_gender_click_rt5m
  ,features[3] item__kv_city_click_rt5m
  ,features[4] item__kv_tags_click_rt5m
  ,CAST(features[5] AS BIGINT) item__cnt_praise_rt5m
  ,features[6] item__kv_gender_praise_rt5m
  ,features[7] item__kv_city_praise_rt5m
  ,features[8] item__kv_tags_praise_rt5m
  ,CAST(features[9] AS BIGINT) item__cnt_click_rt1h
  ,features[10] item__kv_gender_click_rt1h
  ,features[11] item__kv_city_click_rt1h
  ,features[12] item__kv_tags_click_rt1h
  ,CAST(features[13] AS BIGINT) item__cnt_praise_rt1h
  ,features[14] item__kv_gender_praise_rt1h
  ,features[15] item__kv_city_praise_rt1h
  ,features[16] item__kv_tags_praise_rt1h
  ,CAST(features[17] AS BIGINT) item__cnt_click_rt12h
  ,features[18] item__kv_gender_click_rt12h
  ,features[19] item__kv_city_click_rt12h
  ,features[20] item__kv_tags_click_rt12h
  ,CAST(features[21] AS BIGINT) item__cnt_praise_rt12h
  ,features[22] item__kv_gender_praise_rt12h
  ,features[23] item__kv_city_praise_rt12h
  ,features[24] item__kv_tags_praise_rt12h
FROM (
  SELECT
    item_id
    ,UNIX_TIMESTAMP() sys_time
    ,SWCountCatesKVS(
      ARRAY[gender,city,tags]
      ,1
      ,event
      ,ARRAY['click','praise']
      ,delay_flag
      ,ARRAY[300,3600,43200]
      ,page
      ,ARRAY['']
      ,100
      ,item_id
      ,ARRAY['']
    ) features
  FROM rec_sln_demo_behavior_table_dh_wide_delay
  GROUP BY
    item_id
) sq0
;

CREATE TEMPORARY TABLE ali_rec_item_id_rt_statistic_feat
(
  item_id                       string
  ,sys_time                     bigint
  ,item__cnt_click_rt5m         bigint
  ,item__cnt_praise_rt5m        bigint
  ,item__cnt_click_rt1h         bigint
  ,item__cnt_praise_rt1h        bigint
  ,item__cnt_click_rt12h        bigint
  ,item__cnt_praise_rt12h       bigint
  ,item__kv_gender_click_rt5m   string
  ,item__kv_city_click_rt5m     string
  ,item__kv_tags_click_rt5m     string
  ,item__kv_gender_praise_rt5m  string
  ,item__kv_city_praise_rt5m    string
  ,item__kv_tags_praise_rt5m    string
  ,item__kv_gender_click_rt1h   string
  ,item__kv_city_click_rt1h     string
  ,item__kv_tags_click_rt1h     string
  ,item__kv_gender_praise_rt1h  string
  ,item__kv_city_praise_rt1h    string
  ,item__kv_tags_praise_rt1h    string
  ,item__kv_gender_click_rt12h  string
  ,item__kv_city_click_rt12h    string
  ,item__kv_tags_click_rt12h    string
  ,item__kv_gender_praise_rt12h string
  ,item__kv_city_praise_rt12h   string
  ,item__kv_tags_praise_rt12h   string
)
WITH (
  'connector' = 'hologres'
  ,'endPoint' = '{hologres_endpoint}'
  ,'dbname' = 'mydb'
  ,'tablename' = 'ali_rec_item_id_rt_statistic_feat'
  ,'username' = '{accessId}'
  ,'password' = '{accessKey}'
  ,'jdbcWriteFlushInterval' = '500'
  ,'mutatetype' = 'insertorreplace'
)
;

CREATE TEMPORARY VIEW dwd_user_id_rt_feature
AS SELECT
  user_id
  ,sys_time
  ,CAST(features[1] AS BIGINT) user__cnt_click_rt5m
  ,features[2] user__kv_author_click_rt5m
  ,features[3] user__kv_category_click_rt5m
  ,CAST(features[4] AS BIGINT) user__cnt_praise_rt5m
  ,features[5] user__kv_author_praise_rt5m
  ,features[6] user__kv_category_praise_rt5m
  ,CAST(features[7] AS BIGINT) user__cnt_click_rt1h
  ,features[8] user__kv_author_click_rt1h
  ,features[9] user__kv_category_click_rt1h
  ,CAST(features[10] AS BIGINT) user__cnt_praise_rt1h
  ,features[11] user__kv_author_praise_rt1h
  ,features[12] user__kv_category_praise_rt1h
  ,CAST(features[13] AS BIGINT) user__cnt_click_rt12h
  ,features[14] user__kv_author_click_rt12h
  ,features[15] user__kv_category_click_rt12h
  ,CAST(features[16] AS BIGINT) user__cnt_praise_rt12h
  ,features[17] user__kv_author_praise_rt12h
  ,features[18] user__kv_category_praise_rt12h
FROM (
  SELECT
    user_id
    ,UNIX_TIMESTAMP() sys_time
    ,SWCountCatesKVS(
      ARRAY[author,category]
      ,1
      ,event
      ,ARRAY['click','praise']
      ,delay_flag
      ,ARRAY[300,3600,43200]
      ,page
      ,ARRAY['']
      ,100
      ,user_id
      ,ARRAY['']
    ) features
  FROM rec_sln_demo_behavior_table_dh_wide_delay
  GROUP BY
    user_id
) sq0
;

CREATE TEMPORARY TABLE ali_rec_user_id_rt_statistic_feat
(
  user_id                         string
  ,sys_time                       bigint
  ,user__cnt_click_rt5m           bigint
  ,user__cnt_praise_rt5m          bigint
  ,user__cnt_click_rt1h           bigint
  ,user__cnt_praise_rt1h          bigint
  ,user__cnt_click_rt12h          bigint
  ,user__cnt_praise_rt12h         bigint
  ,user__kv_author_click_rt5m     string
  ,user__kv_category_click_rt5m   string
  ,user__kv_author_praise_rt5m    string
  ,user__kv_category_praise_rt5m  string
  ,user__kv_author_click_rt1h     string
  ,user__kv_category_click_rt1h   string
  ,user__kv_author_praise_rt1h    string
  ,user__kv_category_praise_rt1h  string
  ,user__kv_author_click_rt12h    string
  ,user__kv_category_click_rt12h  string
  ,user__kv_author_praise_rt12h   string
  ,user__kv_category_praise_rt12h string
)
WITH (
  'connector' = 'hologres'
  ,'endPoint' = '{hologres_endpoint}'
  ,'dbname' = 'mydb'
  ,'tablename' = 'ali_rec_user_id_rt_statistic_feat'
  ,'username' = '{accessId}'
  ,'password' = '{accessKey}'
  ,'jdbcWriteFlushInterval' = '500'
  ,'mutatetype' = 'insertorreplace'
)
;

BEGIN STATEMENT SET
;

INSERT INTO ali_rec_item_id_rt_statistic_feat
SELECT
  item_id
  ,sys_time
  ,item__cnt_click_rt5m
  ,item__cnt_praise_rt5m
  ,item__cnt_click_rt1h
  ,item__cnt_praise_rt1h
  ,item__cnt_click_rt12h
  ,item__cnt_praise_rt12h
  ,item__kv_gender_click_rt5m
  ,item__kv_city_click_rt5m
  ,item__kv_tags_click_rt5m
  ,item__kv_gender_praise_rt5m
  ,item__kv_city_praise_rt5m
  ,item__kv_tags_praise_rt5m
  ,item__kv_gender_click_rt1h
  ,item__kv_city_click_rt1h
  ,item__kv_tags_click_rt1h
  ,item__kv_gender_praise_rt1h
  ,item__kv_city_praise_rt1h
  ,item__kv_tags_praise_rt1h
  ,item__kv_gender_click_rt12h
  ,item__kv_city_click_rt12h
  ,item__kv_tags_click_rt12h
  ,item__kv_gender_praise_rt12h
  ,item__kv_city_praise_rt12h
  ,item__kv_tags_praise_rt12h
FROM dwd_item_id_rt_feature
;

INSERT INTO ali_rec_user_id_rt_statistic_feat
SELECT
  user_id
  ,sys_time
  ,user__cnt_click_rt5m
  ,user__cnt_praise_rt5m
  ,user__cnt_click_rt1h
  ,user__cnt_praise_rt1h
  ,user__cnt_click_rt12h
  ,user__cnt_praise_rt12h
  ,user__kv_author_click_rt5m
  ,user__kv_category_click_rt5m
  ,user__kv_author_praise_rt5m
  ,user__kv_category_praise_rt5m
  ,user__kv_author_click_rt1h
  ,user__kv_category_click_rt1h
  ,user__kv_author_praise_rt1h
  ,user__kv_category_praise_rt1h
  ,user__kv_author_click_rt12h
  ,user__kv_category_click_rt12h
  ,user__kv_author_praise_rt12h
  ,user__kv_category_praise_rt12h
FROM dwd_user_id_rt_feature
;

end
;
  1. 首先根据第一行下载对应的jar包,在Flink中注册自定义函数。

  2. 第1段SQL是注册DataHub的行为数据流。

  3. 第2,3段SQL是加载mc的user和item的静态特征维表。

  4. 第4段SQL是行为数据流join用户和物品维表信息,形成宽表。同时对userid进行了hash,该hash size需要和下游运行的MessageDelay函数搭配起来用。如果该size过大,会影响数据稀疏:比如一天内,在一些特定的hash后的key中,存在一些秒时间上是不存在数据,会导致数据延迟下发;如果size过小,则会出现数据在worker上倾斜,造成一部分worker计算量和内存消耗过大。

  5. 第5段SQL,是对数据进行滞后发送。当前数据进来后会立即下发一次,标delay_flag记0; 当数据流水位到达该数据下一个窗口,则该数据会再次下发一次,delay_flag为窗口。

MessageDelay函数是一个非常重要函数,具体介绍如下:

MessageDelay(
      ARRAY[user_id,item_id,event,page,gender,city,tags,CAST(author AS STRING),category]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,group_key
      ,IF(event IN ('click','praise'), true, false)
      ,ARRAY['']
    ) delay_info

# 参数介绍
第一个参数是array<string>表示要滞后发送哪些字段
第二个event_time是10位的时间戳,表示行为时间,也是水位时间
第三个array<bigint>,滞后窗口, 是该消息滞后多久会再次被下发,单位是秒
第四个group_key是当前数据hash后的key
第五个是boolen的参数,如果是true,则该消息会根据滞后窗口被再次下发;如果是false,则触发别的数据下发
第六个参数是debug使用,用于统计计算效率,会追加在聚合特征的后面输出

# 输出数据介绍
SELECT
  delay_info[1] user_id
  ,delay_info[2] item_id
  ,delay_info[3] event
  ,delay_info[4] page
  ,delay_info[5] gender
  ,delay_info[6] city
  ,delay_info[7] tags
  ,delay_info[8] author
  ,delay_info[9] category    # 1-9都是messagdelay函数的第一个入参,表示要滞后的字段
  ,CAST(delay_info[10] AS BIGINT) event_time  # 当前日志真实的发生时间,10位时间戳
  ,CAST(delay_info[11] AS BIGINT) water_time  # 被触发输出水位时间
  ,CAST(delay_info[12] AS BIGINT) reader_water_time  # 触发日志开始处理时间
  ,CAST(delay_info[13] AS BIGINT) sys_time    # 当前系统时间,13位时间戳
  ,CAST(delay_info[14] AS INT) delay_flag     # 该日志被滞后的窗口
  ,delay_info[....]  # 如果MessageDelay最后一个入参不为空则会追加在后面,比如可以追加读取到数据时间,则可以计算MessageDelay的效率如何
  1. 第6、8段SQL是统计实时特征的SQL

依赖的主要函数是SWCountCatesKVS,在下面参数中介绍了相关参数,并且展示了生成特征的规律。

"""
SWCountCatesKVS(
ARRAY[gender,city,tags]    # category_fields,要被统计的category_fields
,1                         # cnt_event,每出现一次日志的累加/减量
,event                     # event,当前日志的行为类型
,ARRAY['click','praise']   # valid_events,要统计特征的行为类型
,delay_flag                # messagedelay出入的日志延迟标识,根据次标识进行累加或者累减
,ARRAY[300,3600,43200]     # windows,要统计的日志窗口,单位是秒
,page                      # valid_scenes,场景信息字段
,ARRAY['']                 # 要单独统计的日志场景,如果没有单独统计的场景,给个空字符串,和场景字段配合使用
,100                       # kv字段保留的最多的类目值得数量
,item_id                   # 当前聚合key,item_id或者user_id
,ARRAY['']                 # debug使用的信息,会追加在特征后面,不用可以给个空字符串
)
"""

# 生成的特征规律如下
for sc in ['all']+list(valid_scenes):  # 遍历场景,默认用all来标识全场景
  for win in windows:          # 遍历窗口
    for event in valid_events:  # 遍历行为类型
      print(cnt_event)   # 行为次数统计
      for c in enumurate(category_fields): #遍历类目字段
        print(kv_c_event)  # 类目c在行为event下统计的kv字段

数据检查

以上2种方式,我们可以得到离线训练使用的实时特征和推理使用的实时特征。但是,日志数据在数据链路中可能存在延迟、丢失、重复等问题,这会导致模拟的实时特征和真实统计的实时特征之间存在差异。我们将每一时刻存储到Hologres的特征同步存到MaxCompute,可以和离线模拟的实时特征进行一致性的对比,需要注意模拟实时特征该参数pre_seconds必须设置为0。

select ,avg(if(COMP_RT_FEA(user__kv_brand_id_click_rt_15m_offline,user__kv_brand_id_click_rt_15m_online)>0,0,1))  acc_user__kv_brand_id_click_rt_15m
from(
    select t1.user_id
      ,t1.request_id
      ,FROM_UNIXTIME(event_unix_time) tt
      ,t1.event_unix_time
      ,t2.request_time
      ,t2.sys_time
      ,t1.user__kv_brand_id_click_rt_15m as user__kv_brand_id_click_rt_15m_offline
      ,t2.user__kv_brand_id_click_rt_15m as user__kv_brand_id_click_rt_15m_online
    from(
        select *
        from(
            SELECT  *,ROW_NUMBER() OVER(PARTITION by user_id,request_id order by event_unix_time asc) rnk
            from dwd_user_rt_feature_offline
            where dt='${bdp.system.bizdate}'
            and LENGTH(request_id)>0
        )t where rnk=1
    )t1 join (
        select *
        from(
            SELECT *,ROW_NUMBER() OVER(PARTITION by user_id,request_id order by request_time asc) rnk
            from user_rt_feature_online
            where dt='${bdp.system.bizdate}'
        ) t where rnk=1
    )t2 on t1.user_id=t2.user_id and t1.request_id=t2.request_id and t1.event_unix_time=t2.request_time
)

检查函数如下:

# Copyright (c) Alibaba, Inc. and its affiliates.
from odps.udf import annotate


@annotate('string, string->bigint')
class CompRTFea():
    """Compare 2 rt feature."""

    def evaluate(self, fea1, fea2):
        def _parse_fea(fea):
            if fea is None:
                return {}
            toks = [x for x in fea.split(chr(29)) if ':' in x]
            # toks = [ x for x in fea.split() if ':' in x ]
            kvs = {}
            for kv_s in toks:
                k, v = kv_s.split(':')
                kvs[k] = float(v)
            return kvs
        fea1_kvs = _parse_fea(fea1)
        fea2_kvs = _parse_fea(fea2)
        all_keys = set(fea1_kvs.keys())
        all_keys.update(fea2_kvs.keys())
        err_cnt = 0
        for k in all_keys:
            if k in fea1_kvs and k in fea2_kvs:
                if fea1_kvs[k] != fea2_kvs[k]:
                    err_cnt += 1
            else:
                err_cnt += 1
        return err_cnt