案例:当物品发布6小时内的流量调控
流量调控案例
当新内容发布之后,业务运营希望给新物品一些曝光流量,通过用户反馈的点击、评论等行为转化数据来判断内容的质量。这时候就需要用到流量调控。本文是设置一个流量调控任务的案例。
第一步:在PAI-FeatureStore中创建特征视图
实时特征视图(item_new_table)
特征平台的在线数据源选择FeatureDB。
视图包含以下几个字段:item_id, publish_time, is_in_6hours。通过PAI-FeatureStore的Flink Connector可以实时写入新增的物品和属性。其中publish_time是bigint类型的时间戳,单位是秒。
新增物品时设置is_in_6hours = 1。当发布6个小时之后设置为 is_in_6hours = 0。
可通过PAI-FeatureStore的FeatureStore Java SDK(writeFeatures方法)去更新is_in_6hours的值。
第二步:设置一个冷启动召回
在PAI-Rec的预发服务对应的引擎配置中:设置一个热门召回,当有新物品发布之后,更新热门召回的列表,保证能召回新物品。
{
"Name": "item_cold_start_recall",
"RecallType": "ColdStartRecall",
"RecallCount": 200,
"ColdStartDaoConf": {
"AdapterType": "featurestore",
"FeatureStoreName": "fs_pairec",
"FeatureStoreViewName": "item_new_table"
},
"FilterParams": [
{
"Name": "is_in_6hours",
"Type": "int",
"Operator": "equal",
"Value": 1
}
]
}第三步:设置过滤配置
在PAI-Rec的预发服务对应的引擎配置中:使用SnakeFilter对多种召回通道获取候选集,类似于优先级队列,依次从每个召回集合获取一个候选(Weight都是1)。由于获取的路线像蛇行走,因此命名为SnakeFilter。
{
"FilterConfs": [
{
"Name": "SnakeFilter",
"FilterType": "SnakeFilter",
"RetainNum": 20,
"SnakeType": "REFILL_ON_DUPLICATE",
"AdjustCountConfs": [
{
"RecallName": "GroupHotRecall",
"Weight": 1
},
{
"RecallName": "U2IRecall",
"Weight": 1
},
{
"RecallName": "GlobalHotRecall",
"Weight": 1
},
{
"RecallName": "item_cold_start_recall",
"Weight": 1
}
]
}
]
}第四步:重排打散的逻辑
命中流量调控的物品,会设置一个默认的属性维度__traffic_control_id__,会根据流量调控的任务ID来打散。
在PAI-Rec的预发服务对应的引擎配置中设置一个打散的规则:"WindowSize": 10, "FrequencySize": 2,意思是连续的10个结果作为一个窗口,其中同样的__traffic_control_id__取值不能超过2个。
{
"Name": "TrafficDiversityRuleSort",
"SortType": "DiversityRuleSort",
"DiversitySize": 20,
"DiversityRules": [
{
"Dimensions": [
"__traffic_control_id__"
],
"WindowSize": 10,
"FrequencySize": 2
}
]
}第五步:流量调控任务
注册物品表的时候选择上相关字段:item_id, publish_time, is_in_6hours。
在“设置调控目标”的时候,应用表达式圈选物品属性:publish_time < ${current_time} && publish_time > ${current_time} - 21600。其中${current_time} 是流量调控的内置变量,表示当前的时间戳(精确到秒),21600秒即6个小时。
设置小时级任务:每小时曝光量设置为30。
在预发服务中测试通过之后,再把引擎配置单发布到生产环境,然后在生产环境启动流量调控的任务。
第六步:流量调控统计任务
--********************************************************************--
-- Author: wb-lcl910122
-- Created Time: 2026-02-28 20:39:24
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
-- 1) 先造一个随机整数 event_code
CREATE TEMPORARY TABLE datagen_src
(
request_id STRING
,user_id BIGINT
,item_id BIGINT
,event_code INT
,event_time STRING
,proc_time AS PROCTIME()
,ts AS TO_TIMESTAMP_LTZ(
CAST(`event_time` AS BIGINT)
,0
)
,WATERMARK FOR ts AS ts
)
WITH (
'connector' = 'datagen'
,'rows-per-second' = '50'
,'fields.request_id.length' = '16'
,'fields.user_id.kind' = 'random'
,'fields.user_id.min' = '1'
,'fields.user_id.max' = '100000'
,'fields.item_id.kind' = 'random'
,'fields.item_id.min' = '100000'
,'fields.item_id.max' = '100100'
,'fields.event_code.kind' = 'random'
,'fields.event_code.min' = '0'
,'fields.event_code.max' = '99'
,'fields.event_time.kind' = 'sequence'
,'fields.event_time.start' = '1772280000'
,'fields.event_time.end' = '1772290000'
)
;
CREATE TEMPORARY TABLE item_feature_v1
(
item_id STRING
,publish_time BIGINT
,is_in_6hours STRING
,item_expr_cnt_6h BIGINT
,item_ctr_6h DOUBLE
,flag STRING
,PRIMARY KEY (item_id) NOT ENFORCED
)
WITH (
'connector' = 'hologres'
,'dbname' = 'test'
,'tablename' = 'pairec_item_feature_v1_online'
,'endpoint' = 'hgpostcn-cn-kvw46y8dw0gf-cn-shanghai-vpc-st.hologres.aliyuncs.com:80'
,'cache' = 'LRU'
,'cacheTTLMs' = '60000'
,'cacheEmpty' = 'false'
,'username' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
,'password' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
)
;
-- 2) 映射成枚举字符串:比如 90% expr、10% click
CREATE TEMPORARY VIEW datagen_behavior
AS SELECT
request_id
,user_id
,item_id
,CASE
WHEN event_code < 90 THEN 'expr'
ELSE 'click'
END AS event
,proc_time
,ts
FROM datagen_src
;
--滑动统计流量
CREATE TEMPORARY VIEW item_expr_cnt_ctr_6h_sink
AS SELECT
window_start
,window_end
,CAST(item_id as STRING) as item_id
,COUNT(
CASE
WHEN event = 'expr' THEN 1
ELSE NULL
END
) AS item_expr_cnt_6h
,COALESCE(
CAST(SUM(CASE
WHEN event = 'click' THEN 1
ELSE 0
END) AS DOUBLE) / NULLIF(
CAST(SUM(CASE
WHEN event = 'expr' THEN 1
ELSE 0
END) AS DOUBLE)
,0
)
,0.0
) AS item_ctr_6h
FROM TABLE(HOP(TABLE datagen_behavior, DESCRIPTOR(ts), INTERVAL '1' MINUTES, INTERVAL '6' HOURS))
GROUP BY
window_start
,window_end
,item_id
;
;
CREATE TEMPORARY TABLE task_trigger
(
flag STRING
,proc_time AS PROCTIME()
)
WITH (
'connector' = 'faker'
,'rows-per-second' = '100'
,'fields.flag.expression' = '1'
)
;
CREATE TEMPORARY TABLE item_sink_table
(
item_id STRING
,publish_time BIGINT
,is_in_6hours STRING
,item_expr_cnt_6h BIGINT
,item_ctr_6h DOUBLE
,flag STRING
,PRIMARY KEY (item_id) NOT ENFORCED
)
WITH (
'connector' = 'hologres'
,'dbname' = 'test'
,'tablename' = 'pairec_item_feature_v1_online'
,'username' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
,'password' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
,'endpoint' = 'hgpostcn-cn-kvw46y8dw0gf-cn-shanghai-vpc-st.hologres.aliyuncs.com:80'
,'mutatetype' = 'insertorupdate'
,'ignoredelete' = 'true'
,'partial-insert.enabled' = 'true' -- 开启部分列更新参数,支持仅更新INSERT语句中生命的字段
)
;
CREATE TEMPORARY VIEW is_in_6hours_sink
AS SELECT
item_id
,CASE
WHEN TO_TIMESTAMP_LTZ(publish_time, 0) >= proc_time - INTERVAL '6' HOUR
AND TO_TIMESTAMP_LTZ(publish_time, 0) <= proc_time THEN '1'
ELSE '0'
END AS is_in_6hours
,publish_time
,flag
FROM (
SELECT
item_id
,publish_time
,proc_time
,g.flag
FROM (
SELECT
flag
,proc_time
FROM task_trigger
WHERE SECOND(proc_time) IN (0, 20, 40)
) a
LEFT join item_feature_v1 FOR SYSTEM_TIME AS OF a.proc_time AS g
ON a.flag = g.flag
)
;
BEGIN STATEMENT SET;
insert into item_sink_table (item_id,publish_time,is_in_6hours)
select
item_id,
publish_time,
is_in_6hours
from is_in_6hours_sink ;
insert into item_sink_table (item_id,item_expr_cnt_6h,item_ctr_6h)
select
item_id,
item_expr_cnt_6h,
item_ctr_6h
from item_expr_cnt_ctr_6h_sink ;
END;