案例:当物品发布6小时内的流量调控

更新时间:
复制为 MD 格式

流量调控案例

当新内容发布之后,业务运营希望给新物品一些曝光流量,通过用户反馈的点击、评论等行为转化数据来判断内容的质量。这时候就需要用到流量调控。本文是设置一个流量调控任务的案例。

第一步:在PAI-FeatureStore中创建特征视图

  • 实时特征视图(item_new_table)

    • 特征平台的在线数据源选择FeatureDB。

    • 视图包含以下几个字段:item_id, publish_time, is_in_6hours。通过PAI-FeatureStoreFlink Connector可以实时写入新增的物品和属性。其中publish_timebigint类型的时间戳,单位是秒。

    • 新增物品时设置is_in_6hours = 1。当发布6个小时之后设置为 is_in_6hours = 0。

第二步:设置一个冷启动召回

PAI-Rec的预发服务对应的引擎配置中:设置一个热门召回,当有新物品发布之后,更新热门召回的列表,保证能召回新物品。

{
    "Name": "item_cold_start_recall",
    "RecallType": "ColdStartRecall",
    "RecallCount": 200,
    "ColdStartDaoConf": {
        "AdapterType": "featurestore",
        "FeatureStoreName": "fs_pairec",
        "FeatureStoreViewName": "item_new_table"
    },
    "FilterParams": [
        {
            "Name": "is_in_6hours",
            "Type": "int",
            "Operator": "equal",
            "Value": 1
        }
    ]
}

第三步:设置过滤配置

PAI-Rec的预发服务对应的引擎配置中:使用SnakeFilter对多种召回通道获取候选集,类似于优先级队列,依次从每个召回集合获取一个候选(Weight都是1)。由于获取的路线像蛇行走,因此命名为SnakeFilter。

{
    "FilterConfs": [
        {
            "Name": "SnakeFilter",
            "FilterType": "SnakeFilter",
            "RetainNum": 20,
            "SnakeType": "REFILL_ON_DUPLICATE",
            "AdjustCountConfs": [
                 {
                     "RecallName": "GroupHotRecall",
                     "Weight": 1
                 },
                 {
                     "RecallName": "U2IRecall",
                     "Weight": 1
                 },
                 {
                     "RecallName": "GlobalHotRecall",
                     "Weight": 1
                 },
                 {
                     "RecallName": "item_cold_start_recall",
                     "Weight": 1
                 }
             ]
        }
    ]
}

第四步:重排打散的逻辑

命中流量调控的物品,会设置一个默认的属性维度__traffic_control_id__,会根据流量调控的任务ID来打散。

PAI-Rec的预发服务对应的引擎配置中设置一个打散的规则:"WindowSize": 10, "FrequencySize": 2,意思是连续的10个结果作为一个窗口,其中同样的__traffic_control_id__取值不能超过2个。

{
    "Name": "TrafficDiversityRuleSort",
    "SortType": "DiversityRuleSort",
    "DiversitySize": 20,
    "DiversityRules": [
        {
            "Dimensions": [
                "__traffic_control_id__"
            ],
            "WindowSize": 10,
            "FrequencySize": 2
        }
    ]
}

第五步:流量调控任务

创建流量调控任务

  • 注册物品表的时候选择上相关字段:item_id, publish_time, is_in_6hours。

  • 在“设置调控目标”的时候,应用表达式圈选物品属性:publish_time < ${current_time} && publish_time > ${current_time} - 21600。其中${current_time} 是流量调控的内置变量,表示当前的时间戳(精确到秒),21600秒即6个小时。

  • 设置小时级任务:每小时曝光量设置为30。

在预发服务中测试通过之后,再把引擎配置单发布到生产环境,然后在生产环境启动流量调控的任务。

第六步:流量调控统计任务

--********************************************************************--
-- Author:         wb-lcl910122
-- Created Time:   2026-02-28 20:39:24
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

-- 1) 先造一个随机整数 event_code

CREATE TEMPORARY TABLE datagen_src
(
    request_id  STRING
    ,user_id    BIGINT
    ,item_id    BIGINT
    ,event_code INT
    ,event_time STRING
    ,proc_time  AS PROCTIME()
    ,ts         AS TO_TIMESTAMP_LTZ(
        CAST(`event_time` AS BIGINT)
        ,0
    )
    ,WATERMARK FOR ts AS ts
)
WITH (
    'connector' = 'datagen'
    ,'rows-per-second' = '50'
    ,'fields.request_id.length' = '16'
    ,'fields.user_id.kind' = 'random'
    ,'fields.user_id.min' = '1'
    ,'fields.user_id.max' = '100000'
    ,'fields.item_id.kind' = 'random'
    ,'fields.item_id.min' = '100000'
    ,'fields.item_id.max' = '100100'
    ,'fields.event_code.kind' = 'random'
    ,'fields.event_code.min' = '0'
    ,'fields.event_code.max' = '99'
    ,'fields.event_time.kind' = 'sequence'
    ,'fields.event_time.start' = '1772280000'
    ,'fields.event_time.end' = '1772290000'
)
;

CREATE TEMPORARY TABLE item_feature_v1
(
    item_id           STRING
    ,publish_time     BIGINT
    ,is_in_6hours     STRING
    ,item_expr_cnt_6h BIGINT
    ,item_ctr_6h      DOUBLE
    ,flag             STRING
    ,PRIMARY KEY (item_id) NOT ENFORCED
)
WITH (
    'connector' = 'hologres'
    ,'dbname' = 'test'
    ,'tablename' = 'pairec_item_feature_v1_online'
    ,'endpoint' = 'hgpostcn-cn-kvw46y8dw0gf-cn-shanghai-vpc-st.hologres.aliyuncs.com:80'
    ,'cache' = 'LRU'
    ,'cacheTTLMs' = '60000'
    ,'cacheEmpty' = 'false'
    ,'username' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
    ,'password' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
)
;

-- 2) 映射成枚举字符串:比如 90% expr、10% click

CREATE TEMPORARY VIEW datagen_behavior
AS SELECT
    request_id
    ,user_id
    ,item_id
    ,CASE
        WHEN event_code < 90 THEN 'expr'
        ELSE 'click'
    END AS event
    ,proc_time
    ,ts
FROM datagen_src
;

--滑动统计流量

CREATE TEMPORARY VIEW item_expr_cnt_ctr_6h_sink
AS SELECT
    window_start
    ,window_end
    ,CAST(item_id as STRING) as item_id
    ,COUNT(
        CASE
            WHEN event = 'expr' THEN 1
            ELSE NULL
        END
    ) AS item_expr_cnt_6h
    ,COALESCE(
        CAST(SUM(CASE
            WHEN event = 'click' THEN 1
            ELSE 0
        END) AS DOUBLE) / NULLIF(
            CAST(SUM(CASE
                WHEN event = 'expr' THEN 1
                ELSE 0
            END) AS DOUBLE)
            ,0
        )
        ,0.0
    ) AS item_ctr_6h
FROM TABLE(HOP(TABLE datagen_behavior, DESCRIPTOR(ts), INTERVAL '1' MINUTES, INTERVAL '6' HOURS))
GROUP BY
    window_start
    ,window_end
    ,item_id
;
;

CREATE TEMPORARY TABLE task_trigger
(
    flag       STRING
    ,proc_time AS PROCTIME()
)
WITH (
    'connector' = 'faker'
    ,'rows-per-second' = '100'
    ,'fields.flag.expression' = '1'
)
;

CREATE TEMPORARY TABLE item_sink_table
(
    item_id           STRING
    ,publish_time     BIGINT
    ,is_in_6hours     STRING
    ,item_expr_cnt_6h BIGINT
    ,item_ctr_6h      DOUBLE
    ,flag             STRING
    ,PRIMARY KEY (item_id) NOT ENFORCED
)
WITH (
    'connector' = 'hologres'
    ,'dbname' = 'test'
    ,'tablename' = 'pairec_item_feature_v1_online'
    ,'username' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
    ,'password' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
    ,'endpoint' = 'hgpostcn-cn-kvw46y8dw0gf-cn-shanghai-vpc-st.hologres.aliyuncs.com:80'
    ,'mutatetype' = 'insertorupdate'
    ,'ignoredelete' = 'true'
    ,'partial-insert.enabled' = 'true'   -- 开启部分列更新参数,支持仅更新INSERT语句中生命的字段
)
;

CREATE TEMPORARY VIEW is_in_6hours_sink
AS SELECT
    item_id
    ,CASE
        WHEN TO_TIMESTAMP_LTZ(publish_time, 0) >= proc_time - INTERVAL '6' HOUR
        AND TO_TIMESTAMP_LTZ(publish_time, 0) <= proc_time THEN '1'
        ELSE '0'
    END AS is_in_6hours
    ,publish_time
    ,flag
FROM (
    SELECT
        item_id
        ,publish_time
        ,proc_time
        ,g.flag
    FROM (
        SELECT
            flag
            ,proc_time
        FROM task_trigger
        WHERE SECOND(proc_time) IN (0, 20, 40)
    ) a
        LEFT join item_feature_v1 FOR SYSTEM_TIME AS OF a.proc_time AS g
            ON a.flag = g.flag
)
;

BEGIN STATEMENT SET;
insert into item_sink_table (item_id,publish_time,is_in_6hours)
select
    item_id,
    publish_time,
    is_in_6hours
from is_in_6hours_sink ;
insert into item_sink_table (item_id,item_expr_cnt_6h,item_ctr_6h)
select
    item_id,
    item_expr_cnt_6h,
    item_ctr_6h
from item_expr_cnt_ctr_6h_sink ;
END;