基于PAI-Rec引擎进行二次开发
本文介绍如何基于PAI-Rec引擎去进行二次个性化开发,自定义过滤,召回,排序的介绍。
导入引擎
go get github.com/alibaba/pairec/v2
自定义过滤
增加自定义Filter需要以下步骤
1.实现自定义Filter类,实现IFilter接口, IFilter接口已经在PAI-Rec中已经定义完成,只需实现类
// IFilter 接口定义
type IFilter interface {
// 主要实现 Filter 方法,传入的数据在 filterData.Data 里找到
Filter(filterData *FilterData) error
}package filter
import (
"fmt"
"github.com/alibaba/pairec/v2/filter"
"github.com/alibaba/pairec/v2/module"
)
type MyFilter struct {
}
func (f *MyFilter) Filter(data *filter.FilterData) error {
fmt.Println("my filter")
items := data.Data.([]*module.Item)
newItems := make([]*module.Item, 0)
// 处理items, 符合条件的加入到 newItems中
...
data.Data = newItems
return nil
}2.在server启动前,注册自定义的Filter
package main
import (
"pairec_base/src/controller"
myFilter "pairec_base/src/filter"
"github.com/alibaba/pairec/v2"
"github.com/alibaba/pairec/v2/filter"
)
func main() {
// 先注册自己的 filter,加到 hook 中
pairec.AddStartHook(func() error {
filter.RegisterFilter("myFilter", &myFilter.MyFilter{})
return nil
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}3.修改配置,增加filter的访问,这里是配置时 场景 + filter 列表的形式,场景可以设置具体的值,也可以设置默认值default
// myfilter 是自定义的 filter
"FilterNames": {"default": ["myfilter", "item_exposure_filter"]}自定义召回
和过滤类似,需要实现 Recall 接口, Recall 接口已经定义完成,只需实现类
// Recall 接口定义
type Recall interface {
GetCandidateItems(user *module.User, context *context.RecommendContext) []*module.Item
}package recall
import (
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
type MyRecall struct {
}
func (r *MyRecall) GetCandidateItems(user *module.User, context *context.RecommendContext) []*module.Item {
ret := make([]*module.Item, 0)
// recall 具体逻辑
...
return ret
}
然后使用 RegisterRecall 进行注册。
package main
import (
"pairec_base/src/controller"
recall2 "pairec_base/src/recall"
"github.com/alibaba/pairec/v2"
"github.com/alibaba/pairec/v2/service/recall"
)
func main() {
pairec.AddStartHook(func() error {
recall.RegisterRecall("myRecall", &recall2.MyRecall{})
return nil
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}
在场景类别的配置中,可以设置相应的 recall 名称。
"SceneConfs": {
"home_feed": {
"default": {
// 设置为 myRecall
"RecallNames": ["myRecall"]
}
}
}自定义 Sort 支持
和 filter 类似,sort 也使用同样的方式实现自定义
1.实现自定义 ISort 类, ISort 已经定义好
type ISort interface {
// 实现 sort 接口,数据在 sortData.Data 里
Sort(sortData *SortData) error
}package sort
import (
"fmt"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/sort"
)
type MySort struct {
}
func (s *MySort) Sort(data *sort.SortData) error {
fmt.Println("my sort")
items := data.Data.([]*module.Item)
// sort 具体逻辑处理
...
data.Data = items
return nil
}2.在 server 启动前,注册自定义的 Sort
package main
import (
"pairec_base/src/controller"
sort2 "pairec_base/src/sort"
"github.com/alibaba/pairec/v2"
"github.com/alibaba/pairec/v2/sort"
)
func main() {
pairec.AddStartHook(func() error {
sort.RegisterSort("mySort", &sort2.MySort{})
return nil
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}
3.修改配置,增加 Sort 的访问
"SortNames": {"default": ["mysort", "item_score"]}提权操作自定义
很多情况下模型打分之后,用户需要自定义策略实现对模型得分的提降权操作,用户需要提供自定义的策略逻辑。需要实现boostFunc定义。
type boostFunc func(score float64, user *module.User, item *module.Item, context *context.RecommendContext) float64package rank
import (
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
// 用户自定义
func BoostScore(score float64, user *module.User, item *module.Item, context *context.RecommendContext) float64 {
//编辑提权
vTagId, err := item.IntProperty("vtag_id")
if err == nil && vTagId == 20214 {
return score * 1.3
}
return score
}注册:
package main
import(
prank "github.com/alibaba/pairec/v2/service/rank"
"pairec_base/src/rank"
"github.com/alibaba/pairec/v2"
)
func main(){
pairec.AddStartHook(func() error {
prank.SetBoostFunc(rank.BoostScore)
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}
特征自定义加载
特征加载过程,增加了自定义特征加载的支持。除了在配置中FeatureConfs中加载特征,也支持自定义的 function 加载特征。
加载特征 function 需要实现
type LoadFeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext)通过
func RegisterLoadFeatureFunc(sceneName string, f LoadFeatureFunc) 进行注册。这个是分场景注册。
例如:
package feature
import (
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
func LoadRealTimeFeatures(user *module.User, items []*module.Item, context *context.RecommendContext) {
// 向 user 增加
user.AddProperty("userAge", 30)
// 向 item 增加
for _, item := range items {
item.AddProperty("count", 5)
}
}注册:
package main
import(
pfeature "github.com/alibaba/pairec/v2/service/feature"
"github.com/alibaba/pairec/v2"
)
func main(){
pairec.AddStartHook(func() error {
// feed 是场景名称
pfeature.RegisterLoadFeatureFunc("feed", feature.LoadRealTimeFeatures)
return nil
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}特征工程自定义处理
在特征加载完成之后,很多时候会做特征处理的工作,比如生成新的特征,生成组合特征,特征处理时,可能会有 user 和 item 特征的综合处理等等。在特征加载之后,引擎也提供一些预定义的特征处理算子用于特征工程,在不满足的情况下,可以自定义处理实现。
特征工程的自定义处理也是通过自定义的 function 来实现的。
特征工程 function 定义
type FeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item通过
func RegisterFeatureFunc(sceneName string, f FeatureFunc) 进行注册。这个也是分场景的注册。
举例:
定义一个特征处理函数 funciton
package feature
import (
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
// 这里还可以决定进一步减少返回的 item 数量
func MyFeatureFunc(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item {
if len(items) < 400 {
return items
}
// 特征的处理
return items[:400]
}在 main.go 里进行注册
package main
import(
pfeature "github.com/alibaba/pairec/v2/service/feature"
"github.com/alibaba/pairec/v2"
)
func main(){
pairec.AddStartHook(func() error {
// feed 是场景名称
pfeature.RegisterFeatureFunc("feed", feature.MyFeatureFunc)
return nil
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}自定义 Rank(算法调用)
rank 需要实现 IRank 接口才能注册到流程中。包含两部分,一个是过滤出想自定义调用算法的 item,然后对过滤出来的 item 调用自定义 rank function。
IRank 接口定义如下:
type IRank interface {
// Filter the custom rank of item
Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool
Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext)
}然后通过
func RegisterRank(sceneName string, ranks ...IRank)根据每个场景注册不同的自定义的 rank。
这里 ranks 可以设置多个,可以针对不同的 item,调用不同的 rank。
这里举个简单的例子说明:
type MyRank struct {
index int
}
func NewMyRank() *MyRank {
return &MyRank{
index: 0,
}
}
func (r *MyRank) Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool {
r.index++
item.AddProperty("other", "other")
if r.index%2 == 0 {
item.AddProperty("index", r.index)
return true
}
return false
}
func (r *MyRank) Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext) {
fmt.Println("rank len", len(items))
for _, item := range items {
if f, err := item.FloatProperty("index"); err == nil {
item.Score = float64(f * 10)
}
}
r.index = 0
}如果没有匹配到自定义 Rank 的 item 列表,还是会调用 RankConf 里的模型配置。
完整示例
recall.go 自定义recall 示例
package recall
import (
"fmt"
"math/rand"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
type MyRecall struct {
}
func (r *MyRecall) GetCandidateItems(user *module.User, context *context.RecommendContext) []*module.Item {
fmt.Println("MyRecall is running!")
ret := make([]*module.Item, 0)
for i := 1; i < 100; i++ {
item := module.NewItem(fmt.Sprintf("item_%d", i))
item.Score = rand.Float64()
item.AddProperty("title", fmt.Sprintf("新闻%d", i))
item.AddProperty("count", i)
item.RetrieveId = "myRecall"
ret = append(ret, item)
}
return ret
}
filter.go 自定义 filter 示例
package filter
import (
"fmt"
"github.com/alibaba/pairec/v2/filter"
"github.com/alibaba/pairec/v2/module"
)
type MyFilter struct {
}
func (f *MyFilter) Filter(data *filter.FilterData) error {
fmt.Println("MyFilter is running!")
items := data.Data.([]*module.Item)
newItems := make([]*module.Item, 0)
for _, item := range items {
if item.Score > 0.2 {
newItems = append(newItems, item)
}
}
fmt.Printf("MyFilter: kept %d items\n", len(newItems))
data.Data = newItems
return nil
}rank.go 自定义 rank 示例 ,可以有很多不同的 rank ,rank 和 rank1 分别是不同的示例
package rank
import (
"fmt"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
type MyRank struct {
}
func (rank *MyRank) Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool {
//fmt.Println("MyRank Filter ")
if item != nil && item.Score <= 0.5 {
return true
}
return false
}
func (rank *MyRank) Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext) {
fmt.Println("MyRank is running")
for _, item := range items {
item.Score = BoostScore(item.Score, User, item, context)
}
}
rank1.go
package rank
import (
"fmt"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
type MyRank1 struct{}
func (r *MyRank1) Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool {
//fmt.Println("MyRank1 Filter is running")
if item != nil && 0.5 < item.Score && item.Score < 1 {
return true
}
return false
}
func (r *MyRank1) Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext) {
fmt.Println("MyRank1 is running")
for _, item := range items {
if item.Score >= 0.9 {
item.Score *= 1.1
} else if item.Score >= 0.8 {
item.Score *= 1.2
} else if item.Score >= 0.7 {
item.Score *= 1.3
} else {
item.Score *= 1.4
}
}
}
boost.go 基于 rank 的 boost 示例(如果是自定义rank,需要自己手动调用,非自定义rank会自动调用)
package rank
import (
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
func BoostScore(score float64, user *module.User, item *module.Item, ctx *context.RecommendContext) float64 {
//fmt.Println("BoostScore is running!")
return score * 1.5
}
feature.go 自定义 feature 示例
package feature
import (
"fmt"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
)
// LoadRealTimeFeatures 自定义特征加载函数
func LoadRealTimeFeatures(user *module.User, items []*module.Item, context *context.RecommendContext) {
// 向 user 增加
user.AddProperty("userAge", 30)
// 向 item 增加
for _, item := range items {
item.AddProperty("count", 5)
}
}
// MyFeatureFunc 自定义特征处理
func MyFeatureFunc(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item {
fmt.Println("MyFeatureFunc is running")
if len(items) < context.Size {
fmt.Printf("items less size :%d \n", len(items))
return items
}
item2 := make([]*module.Item, len(items), 0)
for _, item := range items {
if v, ok := item.Properties["userAge"]; ok {
if age, o := v.(int); o && age > 18 {
item2 = append(item2, item)
}
}
}
if len(item2) < context.Size {
fmt.Printf("item2 : %d \n", len(item2))
return item2
} else {
fmt.Printf("items[:context.Size] : %d \n", len(items))
return items[:context.Size]
}
}
main.go 主函数,注册各种自定义
package main
import (
"pairec_base/src/controller"
feature2 "pairec_base/src/feature"
filter2 "pairec_base/src/filter"
rank2 "pairec_base/src/rank"
recall2 "pairec_base/src/recall"
sort2 "pairec_base/src/sort"
"github.com/alibaba/pairec/v2"
"github.com/alibaba/pairec/v2/filter"
"github.com/alibaba/pairec/v2/service/feature"
"github.com/alibaba/pairec/v2/service/rank"
"github.com/alibaba/pairec/v2/service/recall"
"github.com/alibaba/pairec/v2/sort"
)
func main() {
pairec.AddStartHook(func() error {
recall.RegisterRecall("myRecall", &recall2.MyRecall{})
filter.RegisterFilter("myFilter", &filter2.MyFilter{})
sort.RegisterSort("mySort", &sort2.MySort{})
rank.RegisterRank("home_feed", &rank2.MyRank{}, &rank2.MyRank1{})
rank.SetBoostFunc(rank2.BoostScore)
feature.RegisterLoadFeatureFunc("home_feed", feature2.LoadRealTimeFeatures)
feature.RegisterFeatureFunc("home_feed", feature2.MyFeatureFunc)
return nil
})
pairec.Route("/api/rec/feed", &controller.FeedController{})
pairec.Run()
}
config.json 配置示例
{
"RunMode": "product",
"ListenConf": {
"HttpAddr": "",
"HttpPort": 8000
},
"ABTestConf": {
"Host": "",
"Token": ""
},
"FilterConfs": [
],
"RecallConfs": [
{
"Name": "mock_recall",
"RecallType": "MockRecall",
"RecallCount": 200
}
],
"SortNames": {
"default": [
"mySort"
]
},
"FilterNames": {
"default": [
"myFilter"
]
},
"AlgoConfs": [
],
"KafkaConfs": {
},
"RedisConfs": {
},
"SceneConfs": {
"home_feed": {
"default": {
"RecallNames": ["myRecall"],
"FilterNames": ["myFilter"],
"SortName":["mySort"]
}
}
},
"LogConf": {
"RetensionDays": 3,
"DiskSize": 20,
"LogLevel": "INFO"
},
"RankConf": {
},
"FeatureConfs": {
}
}