PAI-Rec引擎自定义召回组件

更新时间:
复制为 MD 格式

本文档基于 FeatureDBCFRecall 完整实现,涵盖从环境准备、数据准备到代码编写、测试验证的全流程。

召回组件的功能:过滤掉权重为负的trigger。

一、环境准备

1.1 项目结构

pairec_base/
├── conf/
│   └── config.json              # 主配置文件
├── src/
│   ├── controller/
│   │   └── feed.go              # Feed 接口控制器
│   ├── recall/
│   │   ├── featuredb_cf_recall.go    # 自定义召回组件
│   │   └── user_item_weight_recall_test.go  # 集成测试
│   ├── sql/
│   │   ├── create_table.sql     # MaxCompute 建表语句
│   │   └── generate_test_data.py # 测试数据生成脚本
│   │   └── variable.sh        # 环境变量配置文件
│   └── main.go                  # 服务入口
└── vendor/
    └── github.com/alibaba/pairec/v2/  # 框架 vendor 代码

1.2 依赖安装

# Go 1.21+ 环境
go version

# 安装依赖(已 vendor 化,通常不需要额外操作)
go mod tidy

1.3 FeatureStore 环境配置

在阿里云控制台完成以下配置:

  1. 创建 FeatureStore 项目(如 fs_test_project

  2. 创建FeatureDB、设置密码(用于访问 FeatureStore API)

  3. 准备阿里云 AK/SK(用于访问 FeatureStore API)

1.4 环境变量配置

创建 src/sql/variable.sh 文件,配置运行环境变量:

#!/bin/bash

# FeatureStore 环境变量配置
# 用法:source ./variable.sh

export PAIREC_FS_REGION_ID="cn-beijing"
export PAIREC_FS_ACCESS_KEY_ID="<your_access_key_id>"
export PAIREC_FS_ACCESS_KEY_SECRET="<your_access_key_secret>"
export PAIREC_FS_PROJECT_NAME="<your_project_name>"
export PAIREC_PAI_PROJECT_NAME="<your_pai_project_name>"

export PAIREC_FS_FEATUREDB_USERNAME="<your_featuredb_username>"
export PAIREC_FS_FEATUREDB_PASSWORD="<your_featuredb_password>"
export PAIREC_FS_FEATUREDB_ENDPOINT="<your_featuredb_addr>"
export PAIREC_FS_FEATUREDB_TOKEN=""
export PAIREC_FS_INSTANCE_ID=""

echo "========================================"
echo "FeatureStore 环境变量已加载"
echo "========================================"

环境变量说明

变量名

说明

示例

PAIREC_FS_REGION_ID

FeatureStore 区域

cn-beijing

PAIREC_FS_ACCESS_KEY_ID

阿里云 AccessKey ID

-

PAIREC_FS_ACCESS_KEY_SECRET

阿里云 AccessKey Secret

-

PAIREC_FS_PROJECT_NAME

FeatureStore 项目名称

-

PAIREC_PAI_PROJECT_NAME

PAI 项目名称

-

PAIREC_FS_FEATUREDB_USERNAME

FeatureDB 用户名

-

PAIREC_FS_FEATUREDB_PASSWORD

FeatureDB 密码

-

PAIREC_FS_FEATUREDB_ENDPOINT

FeatureDB 地址

paifeaturestore.cn-beijing.aliyuncs.com

使用方法

# 加载环境变量
source src/sql/variable.sh

# 启动服务
export PAIREC_FS_TEST_MODE=true
./pairec_server -config ./conf/config.json

二、数据准备

2.1 MaxCompute 表结构设计

用户行为序列表(KKV 表)

-- demo_user_behavior_table: 存储用户实时行为序列
CREATE TABLE IF NOT EXISTS demo_user_behavior_table (
    user_id         STRING COMMENT '用户ID(KKV第一层key)',
    item_id         STRING COMMENT '物品ID(KKV第二层key)',
    event           STRING COMMENT '事件:click/order/dislike/collect',
    play_time       DOUBLE COMMENT '事件耗时(秒)',
    event_unix_time BIGINT COMMENT '事件时间戳(毫秒)'
)
COMMENT '用户行为序列表'
PARTITIONED BY (ds STRING COMMENT '日期分区 yyyyMMdd')
LIFECYCLE 30;

物品协同关系表(KV 表)

-- demo_i2i_collaborative: 存储物品间协同过滤关系
CREATE TABLE IF NOT EXISTS demo_i2i_collaborative (
    item_id             STRING COMMENT '物品ID(KV的key)',
    similar_item_ids    STRING COMMENT '相似物品列表: "item1:score1,item2:score2"'
)
COMMENT '物品协同关系表'
PARTITIONED BY (ds STRING COMMENT '日期分区 yyyyMMdd')
LIFECYCLE 30;

2.2 FeatureStore 视图配置

在 FeatureStore 控制台创建视图:

视图名

类型

数据源

说明

demo_user_behavior_table

Sequence Feature View

MaxCompute 表

用户行为序列,KKV 查询模式

demo_i2i_collaborative

Feature View

MaxCompute 表

物品协同关系,KV 查询模式

2.3 测试数据生成

使用提供的 Python 脚本生成测试数据:

# 生成测试数据 SQL
cd src/sql
python3 generate_test_data.py > test_data.sql

# 查看生成的 SQL
cat test_data.sql

示例输出:

INSERT OVERWRITE demo_user_behavior_table PARTITION (ds='20260304')
VALUES 
    ('user_001', 'item_123', 'click', 15.5, 1741017600),
    ('user_001', 'item_201', 'dislike', 0, 1741017700);

INSERT OVERWRITE demo_i2i_collaborative PARTITION (ds='20260304')
VALUES 
    ('item_123', 'item_456:0.95,item_789:0.80'),
    ('item_201', 'item_211:0.95,item_212:0.80');

在 MaxCompute 客户端或者DataWorks中,创建MaxCompute表,执行上述 SQL 导入数据。


三、自定义召回代码编写

3.1 召回组件核心逻辑

创建文件 src/recall/featuredb_cf_recall.go

package recall

import (
	"encoding/json"
	"fmt"
	gosort "sort"
	"strconv"
	"strings"
	"time"

	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/log"
	"github.com/alibaba/pairec/v2/module"
	"github.com/alibaba/pairec/v2/persist/fs"
	"github.com/alibaba/pairec/v2/recconf"
	precall "github.com/alibaba/pairec/v2/service/recall"
	"github.com/alibaba/pairec/v2/utils"
)

var _ precall.Recall = (*FeatureDBCFRecall)(nil)

const FeatureDBCFRecallName = "FeatureDBCFRecall"

// FeatureDBCFRecallConf 自定义配置结构,从 UserDefineConfs 中解析。
type FeatureDBCFRecallConf struct {
	RecallCount              int
	Name                     string
	RealTimeUser2ItemDaoConf recconf.RealTimeUser2ItemDaoConfig
}

type featureDBCFMyConf struct {
	FeatureDBCFRecallConf FeatureDBCFRecallConf
}

// FeatureDBCFRecall 基于 FeatureDB 数据的实时协同过滤召回。
//
// 与标准 UserItemWeightRecall 的核心区别:
//   - 在用户 trigger item 权重计算完成后,过滤掉权重 < 0 的 trigger item,
//     再基于剩余有效 trigger 进行 i2i 协同过滤召回。
//
// 召回流程:
//  1. 通过 FeatureStore 读取用户行为序列,计算每个 trigger item 的权重
//  2. 过滤权重 < 0 的 trigger item
//  3. 以剩余 trigger item 为基础,查询 i2i 协同关系表召回候选集
//  4. 按分数降序排列,去重后截断至 RecallCount
type FeatureDBCFRecall struct {
	recallName         string
	recallCount        int
	triggerDAO         module.RealTimeUser2ItemDao // 负责从 FeatureDB 获取 trigger infos
	fsClient           *fs.FSClient                // i2i 协同关系表查询客户端
	itemTable          string                      // i2i 协同关系 FeatureView 名称
	similarItemIdField string                      // 相似物品 ID 字段名
	config             FeatureDBCFRecallConf
}

// NewFeatureDBCFRecall 创建 FeatureDBCFRecall 实例,从 UserDefineConfs 读取配置。
func NewFeatureDBCFRecall() *FeatureDBCFRecall {
	config, err := recconf.ParseUserDefineConfs[featureDBCFMyConf]()
	if err != nil {
		panic(fmt.Sprintf("FeatureDBCFRecallConf parse error:%v", err))
	}
	return newFeatureDBCFRecallWithConfig(config.FeatureDBCFRecallConf)
}

func newFeatureDBCFRecallWithConfig(conf FeatureDBCFRecallConf) *FeatureDBCFRecall {
	r := &FeatureDBCFRecall{
		config:             conf,
		recallName:         conf.Name,
		recallCount:        conf.RecallCount,
		itemTable:          conf.RealTimeUser2ItemDaoConf.Item2ItemFeatureViewName,
		similarItemIdField: "similar_item_ids",
	}

	if conf.RealTimeUser2ItemDaoConf.SimilarItemIdField != "" {
		r.similarItemIdField = conf.RealTimeUser2ItemDaoConf.SimilarItemIdField
	}

	// 初始化 trigger DAO(使用标准 FeatureStore 适配器读取用户行为序列)
	recallConf := recconf.RecallConfig{
		Name:                     conf.Name,
		RecallCount:              conf.RecallCount,
		RealTimeUser2ItemDaoConf: conf.RealTimeUser2ItemDaoConf,
	}
	r.triggerDAO = module.NewRealTimeUser2ItemDao(recallConf)

	// 初始化 i2i 协同关系查询的 FeatureStore 客户端
	fsclient, err := fs.GetFeatureStoreClient(conf.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.FeatureStoreName)
	if err != nil {
		panic(fmt.Sprintf("FeatureDBCFRecall: GetFeatureStoreClient error=%v", err))
	}
	r.fsClient = fsclient

	log.Info(fmt.Sprintf("FeatureDBCFRecall: initialized name=%s recallCount=%d itemTable=%s",
		r.recallName, r.recallCount, r.itemTable))

	return r
}

// GetCandidateItems 执行带权重过滤的实时协同过滤召回。
//
// 步骤:
//  1. 从 FeatureDB 获取用户触发 item 列表及权重
//  2. 过滤权重 < 0 的 trigger(核心区别点)
//  3. 若未配置 i2i 表,直接将 trigger item 作为候选集返回
//  4. 否则查询 i2i 协同关系表,展开为候选 item 集合
//  5. 排序、去重、截断
func (r *FeatureDBCFRecall) GetCandidateItems(user *module.User, ctx *context.RecommendContext) []*module.Item {
	start := time.Now()

	if r.triggerDAO == nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\terror=triggerDAO not initialized", ctx.RecommendId))
		return []*module.Item{}
	}

	// Step 1: 获取 trigger infos(包含权重计算)
	triggerInfos := r.triggerDAO.GetTriggerInfos(user, ctx)

	if len(triggerInfos) == 0 {
		log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\tname=%s\tno triggers found\tcost=%d",
			ctx.RecommendId, r.recallName, utils.CostTime(start)))
		return []*module.Item{}
	}

	// Step 2: 过滤权重 < 0 的 trigger item(核心业务逻辑)
	validTriggers := r.filterNegativeWeightTriggers(triggerInfos, ctx)
	if len(validTriggers) == 0 {
		log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\tname=%s\tall triggers filtered by weight<0\tcost=%d",
			ctx.RecommendId, r.recallName, utils.CostTime(start)))
		return []*module.Item{}
	}

	// 构建 trigger map(triggerId -> weight)和有序 triggerIds
	itemTriggers := make(map[string]float64, len(validTriggers))
	triggerIds := make([]string, 0, len(validTriggers))
	for _, t := range validTriggers {
		itemTriggers[t.ItemId] = t.Weight
		triggerIds = append(triggerIds, t.ItemId)
	}

	log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\tname=%s\toriginalTriggers=%d\tvalidTriggers=%d",
		ctx.RecommendId, r.recallName, len(triggerInfos), len(validTriggers)))

	// Step 3: 若没有配置 i2i 表,直接将 trigger item 作为候选集
	var ret []*module.Item
	if r.itemTable == "" {
		for _, t := range validTriggers {
			item := module.NewItem(t.ItemId)
			item.RetrieveId = r.recallName
			item.Score = t.Weight
			ret = append(ret, item)
		}
		ret = r.sortAndTruncate(ret)
		log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\tname=%s\tcount=%d\tcost=%d",
			ctx.RecommendId, r.recallName, len(ret), utils.CostTime(start)))
		return ret
	}

	// Step 4: 查询 i2i 协同关系表,为每个有效 trigger 展开候选 item
	ret = r.recallByI2I(itemTriggers, triggerIds, ctx)

	log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\tname=%s\tcount=%d\tcost=%d",
		ctx.RecommendId, r.recallName, len(ret), utils.CostTime(start)))

	return ret
}

// filterNegativeWeightTriggers 过滤权重 < 0 的 trigger item。
// 权重 < 0 通常意味着用户对该 item 的行为是负向信号(如差评、举报等),
// 这类 item 不应作为协同过滤的触发源。
func (r *FeatureDBCFRecall) filterNegativeWeightTriggers(
	triggers []*module.TriggerInfo,
	ctx *context.RecommendContext,
) []*module.TriggerInfo {
	valid := make([]*module.TriggerInfo, 0, len(triggers))
	filtered := 0
	for _, t := range triggers {
		log.Info(fmt.Sprintf("user trigger:%+v", t))
		if t.Weight < 0 {
			filtered++
			fmt.Println(fmt.Sprintf("user filter trigger:%+v", t))
			if ctx.Debug {
				log.Info(fmt.Sprintf(
					"requestId=%s\tmodule=FeatureDBCFRecall\tfiltered trigger\titemId=%s\tweight=%f",
					ctx.RecommendId, t.ItemId, t.Weight))
			}
			continue
		}
		valid = append(valid, t)
	}
	if filtered > 0 {
		log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\tname=%s\tfiltered negative triggers=%d",
			ctx.RecommendId, r.recallName, filtered))
	}

	return valid
}

// recallByI2I 通过 FeatureStore i2i 协同关系表为有效 trigger 展开候选 item。
func (r *FeatureDBCFRecall) recallByI2I(
	itemTriggers map[string]float64,
	triggerIds []string,
	ctx *context.RecommendContext,
) []*module.Item {
	featureView := r.fsClient.GetProject().GetFeatureView(r.itemTable)
	if featureView == nil {
		log.Error(fmt.Sprintf(
			"requestId=%s\tmodule=FeatureDBCFRecall\trecallName=%s\terror=featureView not found: %s",
			ctx.RecommendId, r.recallName, r.itemTable))
		return nil
	}

	featureEntity := r.fsClient.GetProject().GetFeatureEntity(featureView.GetFeatureEntityName())
	if featureEntity == nil {
		log.Error(fmt.Sprintf(
			"requestId=%s\tmodule=FeatureDBCFRecall\trecallName=%s\terror=featureEntity not found: %s",
			ctx.RecommendId, r.recallName, featureView.GetFeatureEntityName()))
		return nil
	}

	// 批量查询 i2i 相似物品
	itemIds := make([]interface{}, 0, len(triggerIds))
	for _, id := range triggerIds {
		itemIds = append(itemIds, id)
	}

	features, err := featureView.GetOnlineFeatures(itemIds, []string{r.similarItemIdField}, nil)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureDBCFRecall\terror=GetOnlineFeatures(%v)", ctx.RecommendId, err))
		return nil
	}

	var ret []*module.Item
	triggerIdItemMap := make(map[string][]*module.Item, len(itemTriggers))

	for _, featureMap := range features {
		triggerId := utils.ToString(featureMap[featureEntity.FeatureEntityJoinid], "")
		similarIds := utils.ToString(featureMap[r.similarItemIdField], "")
		preferScore := itemTriggers[triggerId]

		list := strings.Split(similarIds, ",")
		for _, entry := range list {
			parts := strings.Split(entry, ":")
			if len(parts) == 0 || parts[0] == "" || parts[0] == "null" {
				continue
			}

			item := module.NewItem(parts[0])
			item.RetrieveId = r.recallName

			switch len(parts) {
			case 2:
				// 格式:itemId:score
				if score, err := strconv.ParseFloat(strings.TrimSpace(parts[1]), 64); err == nil {
					item.Score = score * preferScore
				} else {
					item.Score = preferScore
				}
			case 3:
				// 兼容格式:itemId:recallName:score
				if score, err := strconv.ParseFloat(strings.TrimSpace(parts[2]), 64); err == nil {
					item.Score = score * preferScore
				} else {
					item.Score = preferScore
				}
			default:
				item.Score = preferScore
			}

			ret = append(ret, item)
			triggerIdItemMap[triggerId] = append(triggerIdItemMap[triggerId], item)
		}
	}

	if ctx.Debug {
		for _, triggerId := range triggerIds {
			if items, ok := triggerIdItemMap[triggerId]; ok && len(items) > 0 {
				log.Info(fmt.Sprintf(
					"requestId=%s\tmodule=FeatureDBCFRecall\ttriggerId=%s\ttriggerScore=%f\titems=%s",
					ctx.RecommendId, triggerId, itemTriggers[triggerId], r.debugItemsString(items)))
			}
		}
	}

	return r.sortAndTruncate(ret)
}

// sortAndTruncate 对候选 item 按分数降序排列,去重后截断至 recallCount。
func (r *FeatureDBCFRecall) sortAndTruncate(items []*module.Item) []*module.Item {
	// 按分数降序排列
	gosort.Slice(items, func(i, j int) bool {
		return items[i].Score > items[j].Score
	})

	// 去重
	seen := make(map[module.ItemId]bool, len(items))
	deduped := make([]*module.Item, 0, len(items))
	for _, item := range items {
		if !seen[item.Id] {
			seen[item.Id] = true
			deduped = append(deduped, item)
		}
	}

	// 截断
	if len(deduped) > r.recallCount {
		deduped = deduped[:r.recallCount]
	}
	return deduped
}

func (r *FeatureDBCFRecall) debugItemsString(items []*module.Item) string {
	parts := make([]string, 0, len(items))
	for _, item := range items {
		parts = append(parts, fmt.Sprintf("%s:%f", item.Id, item.Score))
	}
	return strings.Join(parts, ",")
}

// CloneWithConfig 根据 AB 实验参数克隆一个新的 FeatureDBCFRecall 实例。
func (r *FeatureDBCFRecall) CloneWithConfig(params map[string]interface{}) *FeatureDBCFRecall {
	d, err := json.Marshal(params)
	if err != nil {
		log.Error(fmt.Sprintf("module=FeatureDBCFRecall\terr=%v", err))
		return nil
	}
	config := FeatureDBCFRecallConf{}
	err = json.Unmarshal(d, &config)
	if err != nil {
		log.Error(fmt.Sprintf("module=FeatureDBCFRecall\terr=%v", err))
		return nil
	}
	return newFeatureDBCFRecallWithConfig(config)
}

3.2 服务入口注册

修改 src/main.go

package main

import (
	"pairec_demo/src/controller"
	"pairec_demo/src/recall"

	"github.com/alibaba/pairec/v2"
	precall "github.com/alibaba/pairec/v2/service/recall"
)

func main() {
	// 在 Run() 之前预先注册自定义召回组件
	pairec.AddStartHook(func() error {
		precall.RegisterRecall(recall.FeatureDBCFRecallName, recall.NewFeatureDBCFRecall())
		return nil
	})

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

四、配置文件

4.1 config.json 完整配置

创建 conf/config.json 文件:

{
	"RunMode": "prepub",
	"ListenConf": {
		"HttpAddr": "",
		"HttpPort": 8000
	},
	"RecallConfs": [],
	"SortNames": {
		"default": ["ItemRankScore"]
	},
	"FilterNames": {
		"default": []
	},
	"SceneConfs": {
		"home_feed": {
		     "default": {
			"RecallNames": ["FeatureDBCFRecall"]
		     }
		}
	},
	"LogConf": {
		"RetentionDays": 3,
		"DiskSize": 20,
		"LogLevel": "DEBUG",
		"Output": "console"
	},
	"FeatureStoreConfs": {
		"fs_pairec": {
			"RegionId": "<your_region_id>",
			"AccessId": "<your_access_key_id>",
			"AccessKey": "<your_access_key_secret>",
			"ProjectName": "<your_project_name>",
			"FeatureDBUsername": "<your_featuredb_username>",
			"FeatureDBPassword": "<your_featuredb_password>",
			"TestMode": true
		}
	},
	"UserDefineConfs": {
		"FeatureDBCFRecallConf": {
			"Name": "FeatureDBCFRecall",
			"RecallCount": 200,
			"RealTimeUser2ItemDaoConf": {
				"UserTriggerDaoConf": {
					"AdapterType": "featurestore",
					"FeatureStoreName": "fs_pairec",
					"FeatureStoreViewName": "demo_user_behavior_table",
					"TriggerCount": 100,
					"EventWeight": "click:1;order:-1;dislike:-2",
					"WeightExpression": "exp((-0.2)*((currentTime-eventTime)/3600/24))",
					"WeightMode": "sum",
					"NoUsePlayTimeField": true,
					"ItemIdFieldName": "item_id",
					"EventFieldName": "event",
					"TimestampFieldName": "event_unix_time"
				},
				"Item2ItemFeatureViewName": "demo_i2i_collaborative",
				"SimilarItemIdField": "similar_item_ids"
			}
		}
	}
}

配置说明

配置项

说明

占位符替换

RegionId

FeatureStore 区域

<your_region_id>

AccessId

阿里云 AccessKey ID

<your_access_key_id>

AccessKey

阿里云 AccessKey Secret

<your_access_key_secret>

ProjectName

FeatureStore 项目名称

<your_project_name>

FeatureDBUsername

FeatureDB 用户名

<your_featuredb_username>

FeatureDBPassword

FeatureDB 密码

<your_featuredb_password>

注意:所有 <...> 占位符需要替换为实际值,请勿将真实 AK/密码提交到代码仓库。

4.2 关键配置项说明

配置路径

说明

示例值

RecallConfs[].Name

召回组件名称

featuredb_cf_recall

RecallConfs[].RecallType

召回类型标识

FeatureDBCFRecall

UserTriggerDaoConf.FeatureStoreName

FeatureStore 配置名

fs_pairec

UserTriggerDaoConf.FeatureStoreViewName

用户行为视图名

demo_user_behavior_table

Item2ItemFeatureViewName

i2i 协同视图名

demo_i2i_collaborative

EventWeight

事件权重映射

click:1;order:-1;dislike:-2

WeightExpression

时间衰减公式

exp((-0.2)*((currentTime-eventTime)/3600/24))

FeatureStoreConfs[].TestMode

是否启用测试模式

true(本地开发必需)

上线的时候改成false


五、测试验证

5.1 单元测试

# 运行过滤逻辑单元测试
go test ./src/recall/... -run TestFilterNegativeWeightTriggers -v

5.2 集成测试

设置环境变量

export PAIREC_FS_REGION_ID=cn-beijing
export PAIREC_FS_ACCESS_KEY_ID=<your_ak>
export PAIREC_FS_ACCESS_KEY_SECRET=<your_sk>
export PAIREC_FS_PROJECT_NAME=fs_test_project
export PAIREC_FS_FEATUREDB_USERNAME=<your_user_name>
export PAIREC_FS_FEATUREDB_PASSWORD=<your_password>

运行集成测试

# 运行完整集成测试
go test ./src/recall/... -run TestFeatureDBCFRecall_Integration -v -timeout 60s

5.3 服务启动测试

编译服务

# 编译
go build -o pairec_server src/main.go

启动服务

# 设置环境变量并启动
export PAIREC_FS_TEST_MODE=true
./pairec_server -config ./conf/config.json

API 测试

# 发送测试请求
curl -s -X POST "http://localhost:8000/api/rec/feed" \
  -H "Content-Type: application/json" \
  -d '{
    "uid": "user_001",
    "scene_id": "home_feed",
    "size": 10,
    "debug": true
  }' | python3 -m json.tool

预期响应:

{
    "code": 200,
    "msg": "success",
    "request_id": "xxx",
    "size": 10,
    "items": [
        {"id": "item_456", "score": 0.95},
        {"id": "item_789", "score": 0.80}
    ]
}

六、常见问题排查

6.1 召回日志缺失

现象:服务启动后没有 FeatureDBCFRecall 日志

原因service.Load() 在 AddStartHook 之前执行,自定义召回尚未注册

解决:确保在 main.go 中使用 AddStartHook 注册召回组件

6.2 FeatureStore 403 认证错误

现象StatusCode: 403, Response message: auth error

解决

  1. 检查 config.json 中的 AccessId / AccessKey 是否正确

  2. 检查 FeatureDBPassword 是否为最新密码

  3. 确认 TestMode: true 已启用

6.3 无 triggers found

现象no triggers found

排查步骤

  1. 确认 MaxCompute 表中有数据

  2. 确认 FeatureStore 视图已正确同步

  3. 检查 user_id 是否匹配(如 user_001

  4. 检查时间戳单位(应为毫秒)

6.4 VPC 地址超时

现象:使用公网地址连接 FeatureStore VPC 地址超时

解决

# 启用 TestMode 使用公网地址
export PAIREC_FS_TEST_MODE=true

七、核心设计要点

7.1 负权重过滤逻辑

// 过滤权重 < 0 的 trigger(如 dislike、order 事件)
func (r *FeatureDBCFRecall) filterNegativeWeightTriggers(...) []*module.TriggerInfo {
    valid := make([]*module.TriggerInfo, 0, len(triggers))
    for _, t := range triggers {
        if t.Weight < 0 {
            continue  // 跳过负权重 trigger
        }
        valid = append(valid, t)
    }
    return valid
}

7.2 权重计算公式

weight = eventWeight * timeDecay

# eventWeight: 事件类型权重 (click:1, order:-1, dislike:-2)
# timeDecay:   时间衰减因子 exp((-0.2)*((currentTime-eventTime)/3600/24))

7.3 召回流程

1. 读取用户行为序列 (demo_user_behavior_table)
2. 计算每个 trigger item 的权重
3. 过滤权重 < 0 的 trigger
4. 查询 i2i 协同表 (demo_i2i_collaborative)
5. 展开相似物品并计算最终分数
6. 排序、去重、截断返回

八、附录

8.1 事件权重配置参考

事件

权重

说明

click

1

点击,正向信号

collect

2

收藏,强正向信号

order

-1

下单/购买,需排除(已购买不再推荐)

dislike

-2

不喜欢,强负向信号

share

3

分享,最强正向信号

8.2 时间衰减公式参考

公式

半衰期

适用场景

exp((-0.2)*((currentTime-eventTime)/3600/24))

~3.5 天

新闻、短视频

exp((-0.1)*((currentTime-eventTime)/3600/24))

~7 天

电商、商品

exp((-0.05)*((currentTime-eventTime)/3600/24))

~14 天

长周期内容