数据注册
本文档介绍为流量调控做数据准备、数据注册的过程
数据准备
1. 实时行为数据
1.1 PAI-Rec 服务的正式环境
在DataHub准备一个实时行为topic,需要包括用户ID,物品ID,实验ID和事件。用来统计调控任务的实时流量。
1.2 PAI-Rec 服务的预式环境
预发环境用PAI-Rec推荐引擎的后端日志作为实时行为数据,在预发引擎配置上配置,将在PAI-Rec引擎配置单中增加如下的内容,把引擎后端日志记录到DataHub的pairec_debug_log_prepub上,用于预发环境行为统计。
"DatahubConfs": {
"datahub_info": {
"Endpoint": "http://dh-cn-shanghai-int-vpc.aliyuncs.com",
"AccessId": "${AccessKey}",
"AccessKey": "${AccessSecret}",
"ProjectName": "demo_rec",
"TopicName": "pairec_debug_log_prepub",
"Schemas": [
{
"Field": "request_id",
"Type": "string"
},
{
"Field": "module",
"Type": "string"
},
{
"Field": "scene_id",
"Type": "string"
},
{
"Field": "request_time",
"Type": "integer"
},
{
"Field": "exp_id",
"Type": "string"
},
{
"Field": "items",
"Type": "string"
},
{
"Field": "retrieveid",
"Type": "string"
},
{
"Field": "uid",
"Type": "string"
}
]
}
}2. 用户表
在MaxCompute 上准备一张用户表,并在FeatureStore上注册。用来限定调控任务的干预流量范围
3. 物品表
在MaxCompute 上准备两张一样的物品表item_table,item_table_prepub,并在特征平台(FeatureStore)上注册featureView,一个用于正式环境,一个用于预发环境
3.1 按比例调控
物品表包含物品ID,和选品特征字段
3.2 按量\次数调控
物品表包括字段物品ID,被调控物品特征字段is_control,是否完成调控目标字段is_complated
数据注册
1.行为表


2.物品表


3.用户表

其他
在正常召回路径,按量或者按照次数调控的物品可能不能被完全召回出来,需要单独做一张召回表和一路召回。为了提高召回效率,已经达成调控目标的物品就不再召回,需要每半小时更新一次召回表是否完成调控目标字段is_complated。
1.同步实时行为数据
将datahub的实时行为数据rec_sln_demo_behavior_table_v1,pairec_debug_log_prepub落到MaxCompute rec_sln_demo_behavior_table_json_online_v1,pairec_debug_log_prepub上
登录DataHub,进入行为表的topic列表
点击同步,新建MaxCompute connector
填入信息后,点击创建
2.更新召回表
计算曝光次数,更新是否完成调控目标字段is_complated,在DataWorks内新建odps sql 节点,每半个小时运行一次
INSERT OVERWRITE TABLE item_table PARTITION (ds = '${bdp.system.bizdate}')
WITH tmp_a AS
(
SELECT item_id
FROM item_table
WHERE ds = '${bdp.system.bizdate}'
)
,tmp_b AS
(
SELECT *
FROM (
SELECT item_id
,COUNT(*) cnt
FROM (
SELECT CAST(GET_JSON_OBJECT(content,'$.item_id') AS BIGINT) item_id
,GET_JSON_OBJECT(content,'$.exp_id') exp_id
,GET_JSON_OBJECT(content,'$.request_id') request_id
FROM rec_sln_demo_behavior_table_json_online_v1
WHERE ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'),+1,'dd'),'yyyymmdd')
AND GET_JSON_OBJECT(content,'$.request_id') != ''
)
GROUP BY item_id
)
WHERE cnt > 30
)SELECT a.item_id
,CASE WHEN b.item_id IS NOT NULL THEN 0
ELSE 1
END AS is_complated
FROM tmp_a a
LEFT JOIN tmp_b b
ON a.item_id = b.item_id
;
INSERT OVERWRITE TABLE item_table_prepub PARTITION (ds = '${bdp.system.bizdate}')
WITH tmp_a AS
(
SELECT item_id
FROM item_table_prepub
WHERE ds = '${bdp.system.bizdate}'
)
,tmp_b AS
(
SELECT *
FROM (
SELECT item_id
,COUNT(*) cnt
FROM (
SELECT `request_id`
,SPLIT_PART(col1_new,':',1) AS item_id
,exp_id
FROM pairec_debug_log_prepub
LATERAL VIEW EXPLODE(SPLIT(`items`,'},')) adTable AS col1_new
WHERE `module` = 'recommend'
AND ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'),+1,'dd'),'yyyymmdd')
)
GROUP BY item_id
)
WHERE cnt > 30
)SELECT a.item_id
,CASE WHEN b.item_id IS NOT NULL THEN 0
ELSE 1
END AS is_complated
FROM tmp_a a
LEFT JOIN tmp_b b
ON a.item_id = b.item_id
;同步item_table,item_table_prepub到hologres用于召回。点击FeatureStore的数据同步,粘贴同步代码,部署到datawork ,每半小时例行运行一次。

