数据注册

更新时间:
复制为 MD 格式

本文档介绍为流量调控做数据准备、数据注册的过程

数据准备

1. 实时行为数据

1.1 PAI-Rec 服务的正式环境

DataHub准备一个实时行为topic,需要包括用户ID,物品ID,实验ID和事件。用来统计调控任务的实时流量。

1.2 PAI-Rec 服务的式环境

预发环境用PAI-Rec推荐引擎的后端日志作为实时行为数据,在预发引擎配置上配置,将在PAI-Rec引擎配置单中增加如下的内容,把引擎后端日志记录到DataHubpairec_debug_log_prepub上,用于预发环境行为统计。

    "DatahubConfs": {
        "datahub_info": {
            "Endpoint": "http://dh-cn-shanghai-int-vpc.aliyuncs.com",
            "AccessId": "${AccessKey}",
            "AccessKey": "${AccessSecret}",
            "ProjectName": "demo_rec",
            "TopicName": "pairec_debug_log_prepub",
            "Schemas": [
                {
                    "Field": "request_id",
                    "Type": "string"
                },
                {
                    "Field": "module",
                    "Type": "string"
                },
                {
                    "Field": "scene_id",
                    "Type": "string"
                },
                {
                    "Field": "request_time",
                    "Type": "integer"
                },
                {
                    "Field": "exp_id",
                    "Type": "string"
                },
                {
                    "Field": "items",
                    "Type": "string"
                },
                {
                    "Field": "retrieveid",
                    "Type": "string"
                },
                {
                    "Field": "uid",
                    "Type": "string"
                }
            ]
        }
    }

2. 用户表

MaxCompute 上准备一张用户表,并在FeatureStore上注册。用来限定调控任务的干预流量范围

3. 物品表

MaxCompute 上准备两张一样的物品表item_table,item_table_prepub,并在特征平台(FeatureStore)上注册featureView,一个用于正式环境,一个用于预发环境

3.1 按比例调控

物品表包含物品ID,和选品特征字段

3.2 按量\次数调控

物品表包括字段物品ID,被调控物品特征字段is_control,是否完成调控目标字段is_complated

数据注册

1.行为表

截屏2024-12-20 上午11截屏2024-12-20 上午11

2.物品表

截屏2024-12-20 上午11

截屏2024-12-20 上午11

3.用户表

截屏2024-12-20 上午11

其他

在正常召回路径,按量或者按照次数调控的物品可能不能被完全召回出来,需要单独做一张召回表和一路召回。为了提高召回效率,已经达成调控目标的物品就不再召回,需要每半小时更新一次召回表是否完成调控目标字段is_complated。

1.同步时行为数据

datahub的实时行为数据rec_sln_demo_behavior_table_v1,pairec_debug_log_prepub落到MaxCompute rec_sln_demo_behavior_table_json_online_v1,pairec_debug_log_prepub

  1. 登录DataHub,进入行为表的topic列表

  2. 点击同步,新建MaxCompute connector

  3. 填入信息后,点击创建

2.更新召回表

  1. 计算曝光次数,更新是否完成调控目标字段is_complated,在DataWorks内新建odps sql 节点,每半个小时运行一次

INSERT OVERWRITE TABLE item_table PARTITION (ds = '${bdp.system.bizdate}')
WITH tmp_a AS 
(
    SELECT item_id
    FROM   item_table
    WHERE  ds = '${bdp.system.bizdate}'
)
,tmp_b AS 
(
    SELECT  *
    FROM  (
            SELECT item_id
                   ,COUNT(*) cnt
            FROM (
                    SELECT  CAST(GET_JSON_OBJECT(content,'$.item_id') AS BIGINT) item_id
                            ,GET_JSON_OBJECT(content,'$.exp_id') exp_id
                            ,GET_JSON_OBJECT(content,'$.request_id') request_id
                    FROM    rec_sln_demo_behavior_table_json_online_v1
                    WHERE   ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'),+1,'dd'),'yyyymmdd')
                    AND     GET_JSON_OBJECT(content,'$.request_id') != ''
                ) 
            GROUP BY item_id
          ) 
    WHERE cnt > 30
)SELECT a.item_id
        ,CASE WHEN b.item_id IS NOT NULL THEN 0
              ELSE 1
        END AS is_complated
FROM tmp_a a
LEFT JOIN tmp_b b
ON a.item_id = b.item_id
;

INSERT OVERWRITE TABLE item_table_prepub PARTITION (ds = '${bdp.system.bizdate}')
WITH tmp_a AS 
(
    SELECT  item_id
    FROM    item_table_prepub
    WHERE   ds = '${bdp.system.bizdate}'
)
,tmp_b AS 
(
    SELECT  *
    FROM    (
                SELECT  item_id
                        ,COUNT(*) cnt
                FROM    (
                            SELECT  `request_id`
                                    ,SPLIT_PART(col1_new,':',1) AS item_id
                                    ,exp_id
                            FROM    pairec_debug_log_prepub
                            LATERAL VIEW EXPLODE(SPLIT(`items`,'},')) adTable AS col1_new
                            WHERE   `module` = 'recommend'
                            AND     ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'),+1,'dd'),'yyyymmdd')
                        ) 
                GROUP BY item_id
            ) 
    WHERE   cnt > 30
)SELECT  a.item_id
        ,CASE   WHEN b.item_id IS NOT NULL THEN 0
                ELSE 1
        END AS is_complated
FROM    tmp_a a
LEFT JOIN tmp_b b
ON      a.item_id = b.item_id
;
  1. 同步item_table,item_table_prepubhologres用于召回。点击FeatureStore的数据同步,粘贴同步代码,部署到datawork ,每半小时例行运行一次。

截屏2024-10-22 下午3

截屏2024-10-22 下午4