模拟实时统计特征的配置和原理

本文介绍实时统计特征如何构建,有哪些注意点。

背景信息

推荐系统的召回和排序,越能实时、准确地根据用户的操作感知到用户的偏好行为,根据操作行为来召回候选集和排序候选的物品,推荐的效果就越好。但是我们在优化一个推荐场景的召回或者排序模型的时候,往往还没有准备好用户和物品的实时特征,无法在模型打分前1秒获取到用户实时特征和物品实时特征。

如果我们先建设一个实时统计特征、写入到系统中、再根据曝光、点击等事件来关联正负样本,这个流程非常长,还可能开发了一些没有用的实时特征从而浪费开发资源和时间。我们根据长期的推荐系统实践,提出利用离线历史行为数据、用户和物品特征数据来模拟计算出实时特征,得到特征和训练样本,在线上服务的时候用Flink统计真实的实时特征供模型打分使用,充分使用历史数据跳过实时特征积累的过程,能够离线先验证模型效果,同时也大大缩短了项目时间(先用离线模拟的实时特征来训练模型,用真实实时特征来做线上服务,最终再用真实实时特征来训练模型)。

准备数据

1.离线和实时的用户行为日志表

除了离线的行为日志表之外,还需要一张实时行为日志表。实时行为日志表首先存在DataHub中,其余消息中间件也可以,只需要能被Flink SQL消费即可。注意:实时行为日志表的字段是离线行为表字段的子集(即DataHubMaxCompute中的表一些字段名称是一致的),但至少需要以下字段。

字段

类型

是否必须非空

描述

user_id

string

用户id

item_id

string

商品id

request_id

string

请求id

event

string

行为类型,该字段的值一般是英文字符串:expr,click,buy

event_time

bigint

行为发生的时间,必须是10位的时间戳,单位是秒

event_value

double

行为对应的值,如观看时长,购买金额等;如果没有则设置为0

scene

string

用来区分日志场景,如首页猜你喜欢推荐、搜索场景、详情页相关推荐、排行榜等等

字段名称不一定和以上描述完全一样,可以增加冗余字段。

我们准备如下用户行为日志表。MaxCompute离线行为日志表结构如下(其中playtime对应上面表格中的event_value字段,page对应上面的scene字段):

image

离线数据表参考《推荐算法定制最佳实践文档》

DataHub实时行为日志表结构如下:

image

可以看到实时行为表字段和离线行为表完全一样,且字段是离线行为表的子集。

PAI-Rec推荐方案定制->数据注册->DataHub表中进行实时行为表的注册,同时在MaxCompute表中对离线行为表注册

image

2.用户属性表

MaxCompute准备如下表结构:

image并在PAI-Rec中的推荐算法定制中进行MaxCompute进行数据注册

image

3.物品表

MaxCompute中准备如下物品表,同上面一样在PAI-Rec的推荐算法定制中进行数据注册

imageimage

推荐算法定制

我们在PAI-Rec控制台的推荐算法定制的特征工程功能模块中集成了如何配置实时统计特征的功能,配置实时统计特征之后,会产出MaxCompute SQL离线模拟产出推荐请求那一刻需要的实时统计特征。

1.数据表配置

行为日志表配置

image

image

用户表配置

image

物品表配置

image

实时行为日志表配置

image

image

2.离线模拟实时统计特征

在算法定制的特征配置中,对用户和物品实时特征如下配置。尽可能实时特征不要配置曝光行为,否则会导致Flink的资源消耗过大,统计点击以上的行为基本上能取得同样的收益,特殊场景除外。

这里主要依据2个关键函数来实现:

MessageDelay:把数据扩充到窗口数+1份,比如下面有3个窗口,会产生4份数据。

SWCountCatesKVS:根据以上输出日志统计实时特征。

(后文有命名类似的函数MessageDelaySWCountCatesKVS,请注意区分。)

用户实时特征

注意:防穿越时间设置5秒,表示只统计在某次推荐请求5秒之前的行为。防止离线可以统计到,但是在线服务中因为系统延迟、转发等逻辑并不能统计到这个特征。

image

物品实时特征

image

离线模拟实时特征SQL如下:

image

当把推荐算法定制生成的代码部署到DataWorks上面之后,可以在DataWorks上的“数据开发-业务流程中”看到如上节点,分别构造了用户的实时特征和物品的实时特征。

首先构造了ali_rec_wide_table_for_rt的表,该表为模拟实时特征准备。在模拟一天数据的实时特征过程中至少需要前一天和当天的数据,因为当天凌晨的一些用户和物品的实时特征需要利用前一天的数据来模拟统计出实时特征,如上图所示ali_rec_user_id_rt_feature,ali_rec_item_id_rt_feature节点是根据离线日志模拟实时特征的核心。

ali_rec_user_id_rt_feature为例,做以下讲解:

CREATE TABLE IF NOT EXISTS ali_rec_item_id_rt_feature 
(
    item_id string
    ,event_unix_time bigint
    ,request_id string
    ,item__cnt_click_rt5m bigint
    ,item__kv_gender_click_rt5m string
    ,item__kv_city_click_rt5m string
    ,item__kv_tags_click_rt5m string
    ,item__cnt_praise_rt5m bigint
    ,item__kv_gender_praise_rt5m string
    ,item__kv_city_praise_rt5m string
    ,item__kv_tags_praise_rt5m string
    ,item__cnt_click_rt1h bigint
    ,item__kv_gender_click_rt1h string
    ,item__kv_city_click_rt1h string
    ,item__kv_tags_click_rt1h string
    ,item__cnt_praise_rt1h bigint
    ,item__kv_gender_praise_rt1h string
    ,item__kv_city_praise_rt1h string
    ,item__kv_tags_praise_rt1h string
    ,item__cnt_click_rt12h bigint
    ,item__kv_gender_click_rt12h string
    ,item__kv_city_click_rt12h string
    ,item__kv_tags_click_rt12h string
    ,item__cnt_praise_rt12h bigint
    ,item__kv_gender_praise_rt12h string
    ,item__kv_city_praise_rt12h string
    ,item__kv_tags_praise_rt12h string
)
PARTITIONED BY 
(
    ds string
)
LIFECYCLE 90
;
INSERT OVERWRITE TABLE ali_rec_item_id_rt_feature PARTITION(ds='${bdp.system.bizdate}')
SELECT  features['pid'] item_id
        ,CAST(features['event_time'] AS BIGINT) event_unix_time
        ,features['request_id'] request_id
        ,CAST(features['item__cnt_click_rt5m'] AS BIGINT) item__cnt_click_rt5m
        ,features['item__kv_gender_click_rt5m'] item__kv_gender_click_rt5m
        ,features['item__kv_city_click_rt5m'] item__kv_city_click_rt5m
        ,features['item__kv_tags_click_rt5m'] item__kv_tags_click_rt5m
        ,CAST(features['item__cnt_praise_rt5m'] AS BIGINT) item__cnt_praise_rt5m
        ,features['item__kv_gender_praise_rt5m'] item__kv_gender_praise_rt5m
        ,features['item__kv_city_praise_rt5m'] item__kv_city_praise_rt5m
        ,features['item__kv_tags_praise_rt5m'] item__kv_tags_praise_rt5m
        ,CAST(features['item__cnt_click_rt1h'] AS BIGINT) item__cnt_click_rt1h
        ,features['item__kv_gender_click_rt1h'] item__kv_gender_click_rt1h
        ,features['item__kv_city_click_rt1h'] item__kv_city_click_rt1h
        ,features['item__kv_tags_click_rt1h'] item__kv_tags_click_rt1h
        ,CAST(features['item__cnt_praise_rt1h'] AS BIGINT) item__cnt_praise_rt1h
        ,features['item__kv_gender_praise_rt1h'] item__kv_gender_praise_rt1h
        ,features['item__kv_city_praise_rt1h'] item__kv_city_praise_rt1h
        ,features['item__kv_tags_praise_rt1h'] item__kv_tags_praise_rt1h
        ,CAST(features['item__cnt_click_rt12h'] AS BIGINT) item__cnt_click_rt12h
        ,features['item__kv_gender_click_rt12h'] item__kv_gender_click_rt12h
        ,features['item__kv_city_click_rt12h'] item__kv_city_click_rt12h
        ,features['item__kv_tags_click_rt12h'] item__kv_tags_click_rt12h
        ,CAST(features['item__cnt_praise_rt12h'] AS BIGINT) item__cnt_praise_rt12h
        ,features['item__kv_gender_praise_rt12h'] item__kv_gender_praise_rt12h
        ,features['item__kv_city_praise_rt12h'] item__kv_city_praise_rt12h
        ,features['item__kv_tags_praise_rt12h'] item__kv_tags_praise_rt12h
FROM    (
            SELECT  SW_COUNT_CATES_KVS(
                        item_id
                        ,ARRAY(gender,city,tags)
                        ,ARRAY('gender','city','tags')
                        ,1
                        ,event
                        ,ARRAY('click','praise')
                        ,delay_flag
                        ,ARRAY(300,3600,43200)
                        ,100
                        ,5
                        ,event_unix_time
                        ,'${bdp.system.bizdate}'
                        ,'item'
                        ,request_id
                    ) features
            FROM    (
                        SELECT  *
                        FROM    (
                                    SELECT  features[0] request_id
                                            ,features[1] user_id
                                            ,features[2] net_type
                                            ,features[3] item_id
                                            ,features[4] event
                                            ,features[5] day_h
                                            ,features[6] week_day
                                            ,features[7] gender
                                            ,features[8] city
                                            ,features[9] tags
                                            ,features[10] user_register_time_bin
                                            ,features[11] category
                                            ,features[12] author
                                            ,features[13] item_pub_time_bin
                                            ,CAST(features[14] AS BIGINT) event_unix_time
                                            ,CAST(features[15] AS INT) delay_flag
                                    FROM    ali_rec_wide_table_for_rt
                                    LATERAL VIEW MESSAGE_DELAY(ARRAY(request_id,user_id,net_type,item_id,event,day_h,week_day,gender,city,tags,user_register_time_bin,category,author,item_pub_time_bin),event_unix_time,ARRAY(300,3600,43200),IF(event IN ('click','praise'),true,false)) subview0 AS features
                                    WHERE   ds = '${bdp.system.bizdate}'
                                ) sq0
                        DISTRIBUTE BY item_id
                        SORT BY item_id
                                ,event_unix_time
                    ) sq1
        ) sq2
;

MESSAGE_DELAY函数,具体介绍如下。

MESSAGE_DELAY(
      ARRAY[request_id,user_id,net_type,item_id,event,day_h,week_day,gender,city,tags,user_register_time_bin,category,author,item_pub_time_bin]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,IF(event IN ('click','praise'), true, false)
    ) features

# 参数介绍
第一个参数是array<string>表示要滞后发送哪些字段
第二个event_time是10位的时间戳,表示行为时间,也是水位时间
第三个array<bigint>,滞后窗口, 是该消息滞后多久会再次被下发,单位是秒
第四个是boolean参数,表示该条数据是否会根据滞后窗口再次被下发

# 输出数据介绍
SELECT
   features[0] request_id
  ,features[1] user_id
  ,features[2] net_type
  ,features[3] item_id
  ,features[4] event
  ,features[5] day_h
  ,features[6] week_day
  ,features[7] gender
  ,features[8] city
  ,features[9] tags
  ,features[10] user_register_time_bin
  ,features[11] category
  ,features[12] author
  ,features[13] item_pub_time_bin  # 0-13都是MESSAGE_DELAY函数的第一个入参,表示要滞后的字段
  ,CAST(features[14] AS BIGINT) event_time  # 当前日志行为真实的发生时间,10位时间戳
  ,CAST(features[15] AS INT) delay_flag  # 该日志被滞后的窗口

SW_COUNT_CATES_KVS函数,在下面参数中介绍了相关参数,并且展示了生成特征的规律。

"""
SW_COUNT_CATES_KVS(
item_id                        # 当前聚合key,item_id或者user_id
,ARRAY(gender,city,tags)       # category_fields,要被统计的category_fields
,ARRAY('gender','city','tags') # 每个category_field的名称
,1                             # 固定值1
,event                         # 行为字段
,ARRAY('click','praise')       # 需要统计的行为类型
,delay_flag                    # messagedelay出入的日志延迟标识,根据次标识进行累加或者累减
,ARRAY(300,3600,43200)         # windows,要统计的日志窗口,单位是秒
,100                           # 表示topk,kv字段k的截取最近的key数量
,5                             # 防穿越间隔时间,表示当前的模拟的实时统计特征不包含最近X秒发生的行为
,event_time                    # 日志行为时间,即行为发生的时间戳
,'${bdp.system.bizdate}'       # 调度日期当天,即调度发生的时间戳
,'item'                        # 聚合id所属侧,一般为user侧或者item侧
,request_id                    # 请求id,每次推荐请求的唯一标识符,可选,存在时添加该字段
,'page',                       # 场景字段,可选,场景字段和场景的值字段同时存在时,才添加该字段
ARRAY('shouye')                # 场景的值字段,可选,场景字段和场景的值字段同时存在时,才添加该字段
) features
"""

# 生成的特征规律如下
for sc in ['']+list(valid_scenes):  # 遍历场景,默认用''来标识全场景
  for win in windows:           # 遍历窗口
    for event in valid_events:  # 遍历行为类型
      print(cnt_c_event)  # 类目c在行为event下统计的cnt字段,由'{prefix}__cnt_{event}_{sc}_rt{win}'拼成,prefix为聚合id所属侧,一般为user或者item,event为被遍历的窗口,sc为被遍历的场景,win为被遍历的窗口
      for c in enumurate(category_fields): #遍历类目字段、
        print(kv_c_event)   # 类目c在行为event下统计的kv字段,由'{prefix}__kv_{c}_{event}_{sc}_rt{win}'拼成

实时特征范围

1.行为计数统计,周期内可以统计不同行为发生的不同次数,如曝光多少次,点击多少次,都是数值特征

2.行为转换率统计,如点击次数/曝光次数,此种是在第一种结果上做了两两相除

3.偏好属性行为计数,如周期内统计不同行为,对应的属性偏好次数,如周期内,用户对不同类目的点击次数,统计得到kv特征

4.偏好属性行为转换率,这也是在得到偏好属性行为计数后做两两相除得到

样本根据useridrequestid join到实时特征便可以进行训练。物品侧同理

3.实时统计特征在线统计,供给推理使用

实时特征统计实现:

实时特征实现主要依据2个关键函数得实现.

MessageDelay:日志立即输出一次,当水位到达日志在对应的窗口后再各输出一次;如以下讲解中有3个窗口,会直接输出一次,当水位达到当前日志发生时间的300s之后再输出一次,3600s之后再输出一次,43200s之后再输出一次。

SWCountCatesKVS:根据以上输出日志统计实时特征。

以此实现实时特征秒级变动。

配置依赖

如果在该算法定制的环境配置中配置有flink数据源,则会生成flink统计当前实时特征的sql

image

打开flink对应空间,在作业运维中可以看到实时特征统计的sql脚本

image

脚本sql如下

-- please register flink function from https://easyrec.oss-cn-beijing.aliyuncs.com/deploy/real_time_seq_feature/feature-generate-flink-udf-v1.3.jar

CREATE TEMPORARY TABLE rec_sln_demo_behavior_table_dh
(
  user_id     string
  ,item_id    string
  ,event_time bigint
  ,event      string
  ,playtime   double
  ,page       string
)
WITH (
  'connector' = 'datahub'
  ,'subId' = '{topic_subid}'
  ,'endPoint' = '{datahub_endpoint}'
  ,'project' = '{datahub_project}'
  ,'topic' = 'rec_sln_demo_behavior_table_dh'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
)
;

CREATE TEMPORARY TABLE ali_rec_rec_sln_demo_user_table_preprocess
(
  user_id                 string
  ,gender                 string
  ,age                    bigint
  ,city                   string
  ,item_cnt               bigint
  ,follow_cnt             bigint
  ,follower_cnt           bigint
  ,tags                   string
  ,user_register_time     bigint
  ,user_register_time_bin string
  ,PRIMARY KEY (user_id)
)
WITH (
  'connector' = 'odps'
  ,'endPoint' = '{odps_endpoint}'
  ,'project' = 'pairec_demo'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
  ,'tableName' = 'ali_rec_rec_sln_demo_user_table_preprocess'
  ,'partition' = 'max_pt_with_done()'
  ,'cache' = 'ALL'
  ,'cacheSize' = '10000000'
  ,'cacheTTLMs' = '3600000'
)
;

CREATE TEMPORARY TABLE ali_rec_rec_sln_demo_item_table_preprocess
(
  item_id            string
  ,duration          double
  ,category          string
  ,author            bigint
  ,click_count       bigint
  ,praise_count      bigint
  ,item_pub_time     bigint
  ,item_pub_time_bin string
  ,PRIMARY KEY (item_id)
)
WITH (
  'connector' = 'odps'
  ,'endPoint' = '{odps_endpoint}'
  ,'project' = 'pairec_demo'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
  ,'tableName' = 'ali_rec_rec_sln_demo_item_table_preprocess'
  ,'partition' = 'max_pt_with_done()'
  ,'cache' = 'ALL'
  ,'cacheSize' = '10000000'
  ,'cacheTTLMs' = '3600000'
)
;

CREATE TEMPORARY VIEW rec_sln_demo_behavior_table_dh_wide
AS SELECT
  user_id
  ,item_id
  ,event_time
  ,event
  ,playtime
  ,page
  ,gender
  ,age
  ,city
  ,item_cnt
  ,follow_cnt
  ,follower_cnt
  ,tags
  ,user_register_time
  ,user_register_time_bin
  ,duration
  ,category
  ,author
  ,click_count
  ,praise_count
  ,item_pub_time
  ,item_pub_time_bin
  ,HashBucket(user_id, 2) group_key
FROM rec_sln_demo_behavior_table_dh t1
  LEFT JOIN ali_rec_rec_sln_demo_user_table_preprocess FOR SYSTEM_TIME AS OF PROCTIME() t2
    ON t1.user_id = t2.user_id
  LEFT JOIN ali_rec_rec_sln_demo_item_table_preprocess FOR SYSTEM_TIME AS OF PROCTIME() t3
    ON t1.item_id = t3.item_id
;

CREATE TEMPORARY VIEW rec_sln_demo_behavior_table_dh_wide_delay
AS SELECT
  delay_info[1] user_id
  ,delay_info[2] item_id
  ,delay_info[3] event
  ,delay_info[4] page
  ,delay_info[5] gender
  ,delay_info[6] city
  ,delay_info[7] tags
  ,delay_info[8] author
  ,delay_info[9] category
  ,CAST(delay_info[10] AS BIGINT) event_time
  ,CAST(delay_info[13] AS BIGINT) sys_time
  ,CAST(delay_info[14] AS INT) delay_flag
FROM (
  SELECT
    MessageDelay(
      ARRAY[user_id,item_id,event,page,gender,city,tags,CAST(author AS STRING),category]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,group_key
      ,IF(event IN ('click','praise'), true, false)
      ,ARRAY['']
    ) delay_info
  FROM rec_sln_demo_behavior_table_dh_wide
  GROUP BY
    group_key
) sq0
;

CREATE TEMPORARY VIEW dwd_item_id_rt_feature
AS SELECT
  item_id
  ,sys_time
  ,CAST(features[1] AS BIGINT) item__cnt_click_rt5m
  ,features[2] item__kv_gender_click_rt5m
  ,features[3] item__kv_city_click_rt5m
  ,features[4] item__kv_tags_click_rt5m
  ,CAST(features[5] AS BIGINT) item__cnt_praise_rt5m
  ,features[6] item__kv_gender_praise_rt5m
  ,features[7] item__kv_city_praise_rt5m
  ,features[8] item__kv_tags_praise_rt5m
  ,CAST(features[9] AS BIGINT) item__cnt_click_rt1h
  ,features[10] item__kv_gender_click_rt1h
  ,features[11] item__kv_city_click_rt1h
  ,features[12] item__kv_tags_click_rt1h
  ,CAST(features[13] AS BIGINT) item__cnt_praise_rt1h
  ,features[14] item__kv_gender_praise_rt1h
  ,features[15] item__kv_city_praise_rt1h
  ,features[16] item__kv_tags_praise_rt1h
  ,CAST(features[17] AS BIGINT) item__cnt_click_rt12h
  ,features[18] item__kv_gender_click_rt12h
  ,features[19] item__kv_city_click_rt12h
  ,features[20] item__kv_tags_click_rt12h
  ,CAST(features[21] AS BIGINT) item__cnt_praise_rt12h
  ,features[22] item__kv_gender_praise_rt12h
  ,features[23] item__kv_city_praise_rt12h
  ,features[24] item__kv_tags_praise_rt12h
FROM (
  SELECT
    item_id
    ,UNIX_TIMESTAMP() sys_time
    ,SWCountCatesKVS(
      ARRAY[gender,city,tags]
      ,1
      ,event
      ,ARRAY['click','praise']
      ,delay_flag
      ,ARRAY[300,3600,43200]
      ,page
      ,ARRAY['']
      ,100
      ,item_id
      ,ARRAY['']
    ) features
  FROM rec_sln_demo_behavior_table_dh_wide_delay
  GROUP BY
    item_id
) sq0
;

CREATE TEMPORARY TABLE ali_rec_item_id_rt_statistic_feat
(
  item_id                       string
  ,sys_time                     bigint
  ,item__cnt_click_rt5m         bigint
  ,item__cnt_praise_rt5m        bigint
  ,item__cnt_click_rt1h         bigint
  ,item__cnt_praise_rt1h        bigint
  ,item__cnt_click_rt12h        bigint
  ,item__cnt_praise_rt12h       bigint
  ,item__kv_gender_click_rt5m   string
  ,item__kv_city_click_rt5m     string
  ,item__kv_tags_click_rt5m     string
  ,item__kv_gender_praise_rt5m  string
  ,item__kv_city_praise_rt5m    string
  ,item__kv_tags_praise_rt5m    string
  ,item__kv_gender_click_rt1h   string
  ,item__kv_city_click_rt1h     string
  ,item__kv_tags_click_rt1h     string
  ,item__kv_gender_praise_rt1h  string
  ,item__kv_city_praise_rt1h    string
  ,item__kv_tags_praise_rt1h    string
  ,item__kv_gender_click_rt12h  string
  ,item__kv_city_click_rt12h    string
  ,item__kv_tags_click_rt12h    string
  ,item__kv_gender_praise_rt12h string
  ,item__kv_city_praise_rt12h   string
  ,item__kv_tags_praise_rt12h   string
)
WITH (
  'connector' = 'hologres'
  ,'endPoint' = '{hologres_endpoint}'
  ,'dbname' = 'mydb'
  ,'tablename' = 'ali_rec_item_id_rt_statistic_feat'
  ,'username' = '{accessId}'
  ,'password' = '{accessKey}'
  ,'jdbcWriteFlushInterval' = '500'
  ,'mutatetype' = 'insertorreplace'
)
;

CREATE TEMPORARY VIEW dwd_user_id_rt_feature
AS SELECT
  user_id
  ,sys_time
  ,CAST(features[1] AS BIGINT) user__cnt_click_rt5m
  ,features[2] user__kv_author_click_rt5m
  ,features[3] user__kv_category_click_rt5m
  ,CAST(features[4] AS BIGINT) user__cnt_praise_rt5m
  ,features[5] user__kv_author_praise_rt5m
  ,features[6] user__kv_category_praise_rt5m
  ,CAST(features[7] AS BIGINT) user__cnt_click_rt1h
  ,features[8] user__kv_author_click_rt1h
  ,features[9] user__kv_category_click_rt1h
  ,CAST(features[10] AS BIGINT) user__cnt_praise_rt1h
  ,features[11] user__kv_author_praise_rt1h
  ,features[12] user__kv_category_praise_rt1h
  ,CAST(features[13] AS BIGINT) user__cnt_click_rt12h
  ,features[14] user__kv_author_click_rt12h
  ,features[15] user__kv_category_click_rt12h
  ,CAST(features[16] AS BIGINT) user__cnt_praise_rt12h
  ,features[17] user__kv_author_praise_rt12h
  ,features[18] user__kv_category_praise_rt12h
FROM (
  SELECT
    user_id
    ,UNIX_TIMESTAMP() sys_time
    ,SWCountCatesKVS(
      ARRAY[author,category]
      ,1
      ,event
      ,ARRAY['click','praise']
      ,delay_flag
      ,ARRAY[300,3600,43200]
      ,page
      ,ARRAY['']
      ,100
      ,user_id
      ,ARRAY['']
    ) features
  FROM rec_sln_demo_behavior_table_dh_wide_delay
  GROUP BY
    user_id
) sq0
;

CREATE TEMPORARY TABLE ali_rec_user_id_rt_statistic_feat
(
  user_id                         string
  ,sys_time                       bigint
  ,user__cnt_click_rt5m           bigint
  ,user__cnt_praise_rt5m          bigint
  ,user__cnt_click_rt1h           bigint
  ,user__cnt_praise_rt1h          bigint
  ,user__cnt_click_rt12h          bigint
  ,user__cnt_praise_rt12h         bigint
  ,user__kv_author_click_rt5m     string
  ,user__kv_category_click_rt5m   string
  ,user__kv_author_praise_rt5m    string
  ,user__kv_category_praise_rt5m  string
  ,user__kv_author_click_rt1h     string
  ,user__kv_category_click_rt1h   string
  ,user__kv_author_praise_rt1h    string
  ,user__kv_category_praise_rt1h  string
  ,user__kv_author_click_rt12h    string
  ,user__kv_category_click_rt12h  string
  ,user__kv_author_praise_rt12h   string
  ,user__kv_category_praise_rt12h string
)
WITH (
  'connector' = 'hologres'
  ,'endPoint' = '{hologres_endpoint}'
  ,'dbname' = 'mydb'
  ,'tablename' = 'ali_rec_user_id_rt_statistic_feat'
  ,'username' = '{accessId}'
  ,'password' = '{accessKey}'
  ,'jdbcWriteFlushInterval' = '500'
  ,'mutatetype' = 'insertorreplace'
)
;

BEGIN STATEMENT SET
;

INSERT INTO ali_rec_item_id_rt_statistic_feat
SELECT
  item_id
  ,sys_time
  ,item__cnt_click_rt5m
  ,item__cnt_praise_rt5m
  ,item__cnt_click_rt1h
  ,item__cnt_praise_rt1h
  ,item__cnt_click_rt12h
  ,item__cnt_praise_rt12h
  ,item__kv_gender_click_rt5m
  ,item__kv_city_click_rt5m
  ,item__kv_tags_click_rt5m
  ,item__kv_gender_praise_rt5m
  ,item__kv_city_praise_rt5m
  ,item__kv_tags_praise_rt5m
  ,item__kv_gender_click_rt1h
  ,item__kv_city_click_rt1h
  ,item__kv_tags_click_rt1h
  ,item__kv_gender_praise_rt1h
  ,item__kv_city_praise_rt1h
  ,item__kv_tags_praise_rt1h
  ,item__kv_gender_click_rt12h
  ,item__kv_city_click_rt12h
  ,item__kv_tags_click_rt12h
  ,item__kv_gender_praise_rt12h
  ,item__kv_city_praise_rt12h
  ,item__kv_tags_praise_rt12h
FROM dwd_item_id_rt_feature
;

INSERT INTO ali_rec_user_id_rt_statistic_feat
SELECT
  user_id
  ,sys_time
  ,user__cnt_click_rt5m
  ,user__cnt_praise_rt5m
  ,user__cnt_click_rt1h
  ,user__cnt_praise_rt1h
  ,user__cnt_click_rt12h
  ,user__cnt_praise_rt12h
  ,user__kv_author_click_rt5m
  ,user__kv_category_click_rt5m
  ,user__kv_author_praise_rt5m
  ,user__kv_category_praise_rt5m
  ,user__kv_author_click_rt1h
  ,user__kv_category_click_rt1h
  ,user__kv_author_praise_rt1h
  ,user__kv_category_praise_rt1h
  ,user__kv_author_click_rt12h
  ,user__kv_category_click_rt12h
  ,user__kv_author_praise_rt12h
  ,user__kv_category_praise_rt12h
FROM dwd_user_id_rt_feature
;

end
;
  1. 首先根据第一行下载对应的jar,在Flink中注册自定义函数。

  2. 1SQL是注册DataHub的行为数据流。

  3. 2,3SQL是加载mcuseritem的静态特征维表。

  4. 4SQL是行为数据流join用户和物品维表信息,形成宽表。同时对userid进行了hash,该hash size需要和下游运行的MessageDelay函数搭配起来用。如果该size过大,会影响数据稀疏:比如一天内,在一些特定的hash后的key中,存在一些秒时间上是不存在数据,会导致数据延迟下发;如果size过小,则会出现数据在worker上倾斜,造成一部分worker计算量和内存消耗过大。

  5. 5SQL,是对数据进行滞后发送。当前数据进来后会立即下发一次,标delay_flag0; 当数据流水位到达该数据下一个窗口,则该数据会再次下发一次,delay_flag为窗口。

MessageDelay函数是一个非常重要函数,具体介绍如下:

MessageDelay(
      ARRAY[user_id,item_id,event,page,gender,city,tags,CAST(author AS STRING),category]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,group_key
      ,IF(event IN ('click','praise'), true, false)
      ,ARRAY['']
    ) delay_info

# 参数介绍
第一个参数是array<string>表示要滞后发送哪些字段
第二个event_time是10位的时间戳,表示行为时间,也是水位时间
第三个array<bigint>,滞后窗口, 是该消息滞后多久会再次被下发,单位是秒
第四个group_key是当前数据hash后的key
第五个是boolen的参数,如果是true,则该消息会根据滞后窗口被再次下发;如果是false,则触发别的数据下发
第六个参数是debug使用,用于统计计算效率,会追加在聚合特征的后面输出

# 输出数据介绍
SELECT
  delay_info[1] user_id
  ,delay_info[2] item_id
  ,delay_info[3] event
  ,delay_info[4] page
  ,delay_info[5] gender
  ,delay_info[6] city
  ,delay_info[7] tags
  ,delay_info[8] author
  ,delay_info[9] category    # 1-9都是messagdelay函数的第一个入参,表示要滞后的字段
  ,CAST(delay_info[10] AS BIGINT) event_time  # 当前日志真实的发生时间,10位时间戳
  ,CAST(delay_info[11] AS BIGINT) water_time  # 被触发输出水位时间
  ,CAST(delay_info[12] AS BIGINT) reader_water_time  # 触发日志开始处理时间
  ,CAST(delay_info[13] AS BIGINT) sys_time    # 当前系统时间,13位时间戳
  ,CAST(delay_info[14] AS INT) delay_flag     # 该日志被滞后的窗口
  ,delay_info[....]  # 如果MessageDelay最后一个入参不为空则会追加在后面,比如可以追加读取到数据时间,则可以计算MessageDelay的效率如何
  1. 6、8SQL是统计实时特征的SQL

依赖的主要函数是SWCountCatesKVS,在下面参数中介绍了相关参数,并且展示了生成特征的规律。

"""
SWCountCatesKVS(
ARRAY[gender,city,tags]    # category_fields,要被统计的category_fields
,1                         # cnt_event,每出现一次日志的累加/减量
,event                     # event,当前日志的行为类型
,ARRAY['click','praise']   # valid_events,要统计特征的行为类型
,delay_flag                # messagedelay出入的日志延迟标识,根据次标识进行累加或者累减
,ARRAY[300,3600,43200]     # windows,要统计的日志窗口,单位是秒
,page                      # valid_scenes,场景信息字段
,ARRAY['']                 # 要单独统计的日志场景,如果没有单独统计的场景,给个空字符串,和场景字段配合使用
,100                       # kv字段保留的最多的类目值得数量
,item_id                   # 当前聚合key,item_id或者user_id
,ARRAY['']                 # debug使用的信息,会追加在特征后面,不用可以给个空字符串
)
"""

# 生成的特征规律如下
for sc in ['all']+list(valid_scenes):  # 遍历场景,默认用all来标识全场景
  for win in windows:          # 遍历窗口
    for event in valid_events:  # 遍历行为类型
      print(cnt_event)   # 行为次数统计
      for c in enumurate(category_fields): #遍历类目字段
        print(kv_c_event)  # 类目c在行为event下统计的kv字段

数据检查

以上2种方式,我们可以得到离线训练使用的实时特征和推理使用的实时特征。但是,日志数据在数据链路中可能存在延迟、丢失、重复等问题,这会导致模拟的实时特征和真实统计的实时特征之间存在差异。我们将每一时刻存储到Hologres的特征同步存到MaxCompute,可以和离线模拟的实时特征进行一致性的对比,需要注意模拟实时特征该参数pre_seconds必须设置为0。

select ,avg(if(COMP_RT_FEA(user__kv_brand_id_click_rt_15m_offline,user__kv_brand_id_click_rt_15m_online)>0,0,1))  acc_user__kv_brand_id_click_rt_15m
from(
    select t1.user_id
      ,t1.request_id
      ,FROM_UNIXTIME(event_unix_time) tt
      ,t1.event_unix_time
      ,t2.request_time
      ,t2.sys_time
      ,t1.user__kv_brand_id_click_rt_15m as user__kv_brand_id_click_rt_15m_offline
      ,t2.user__kv_brand_id_click_rt_15m as user__kv_brand_id_click_rt_15m_online
    from(
        select *
        from(
            SELECT  *,ROW_NUMBER() OVER(PARTITION by user_id,request_id order by event_unix_time asc) rnk
            from dwd_user_rt_feature_offline
            where dt='${bdp.system.bizdate}'
            and LENGTH(request_id)>0
        )t where rnk=1
    )t1 join (
        select *
        from(
            SELECT *,ROW_NUMBER() OVER(PARTITION by user_id,request_id order by request_time asc) rnk
            from user_rt_feature_online
            where dt='${bdp.system.bizdate}'
        ) t where rnk=1
    )t2 on t1.user_id=t2.user_id and t1.request_id=t2.request_id and t1.event_unix_time=t2.request_time
)

检查函数如下:

# Copyright (c) Alibaba, Inc. and its affiliates.
from odps.udf import annotate


@annotate('string, string->bigint')
class CompRTFea():
    """Compare 2 rt feature."""

    def evaluate(self, fea1, fea2):
        def _parse_fea(fea):
            if fea is None:
                return {}
            toks = [x for x in fea.split(chr(29)) if ':' in x]
            # toks = [ x for x in fea.split() if ':' in x ]
            kvs = {}
            for kv_s in toks:
                k, v = kv_s.split(':')
                kvs[k] = float(v)
            return kvs
        fea1_kvs = _parse_fea(fea1)
        fea2_kvs = _parse_fea(fea2)
        all_keys = set(fea1_kvs.keys())
        all_keys.update(fea2_kvs.keys())
        err_cnt = 0
        for k in all_keys:
            if k in fea1_kvs and k in fea2_kvs:
                if fea1_kvs[k] != fea2_kvs[k]:
                    err_cnt += 1
            else:
                err_cnt += 1
        return err_cnt