基于PAI-Rec引擎进行二次开发

更新时间:
复制为 MD 格式

本文介绍如何基于PAI-Rec引擎去进行二次个性化开发,自定义过滤,召回,排序的介绍。

导入引擎

go get github.com/alibaba/pairec/v2

自定义过滤

增加自定义Filter需要以下步骤

1.实现自定义Filter类,实现IFilter接口, IFilter接口已经在PAI-Rec中已经定义完成,只需实现类

 // IFilter 接口定义
 type IFilter interface {
        // 主要实现 Filter 方法,传入的数据在 filterData.Data 里找到
        Filter(filterData *FilterData) error
 }
package filter
import (
	"fmt"

	"github.com/alibaba/pairec/v2/filter"
	"github.com/alibaba/pairec/v2/module"
)
 
type MyFilter struct {
}

func (f *MyFilter) Filter(data *filter.FilterData) error {
    fmt.Println("my filter")
    items := data.Data.([]*module.Item)  
    newItems := make([]*module.Item, 0)
    // 处理items, 符合条件的加入到 newItems中
    ...
    data.Data = newItems
    return nil
}

2.在server启动前,注册自定义的Filter

package main

import (
	"pairec_base/src/controller"
	myFilter "pairec_base/src/filter"

	"github.com/alibaba/pairec/v2"
	"github.com/alibaba/pairec/v2/filter"
)

func main() {
        // 先注册自己的 filter,加到 hook 中
	pairec.AddStartHook(func() error {
		filter.RegisterFilter("myFilter", &myFilter.MyFilter{})
		return nil
	})

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

3.修改配置,增加filter的访问,这里是配置时 场景 + filter 列表的形式,场景可以设置具体的值,也可以设置默认值default

// myfilter 是自定义的 filter
"FilterNames":  {"default":  ["myfilter", "item_exposure_filter"]}

自定义召回

和过滤类似,需要实现 Recall 接口, Recall 接口已经定义完成,只需实现类

// Recall 接口定义
type Recall interface {
    GetCandidateItems(user *module.User, context *context.RecommendContext) []*module.Item
}
package recall

import (
	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

type MyRecall struct {
}

func (r *MyRecall) GetCandidateItems(user *module.User, context *context.RecommendContext) []*module.Item {
	ret := make([]*module.Item, 0)
	// recall 具体逻辑
        ...
	return ret
}
                                     

然后使用 RegisterRecall 进行注册。

package main

import (
	"pairec_base/src/controller"
	recall2 "pairec_base/src/recall"

	"github.com/alibaba/pairec/v2"
	"github.com/alibaba/pairec/v2/service/recall"
)

func main() {
	pairec.AddStartHook(func() error {
		recall.RegisterRecall("myRecall", &recall2.MyRecall{})
		return nil
	})

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

在场景类别的配置中,可以设置相应的 recall 名称。

"SceneConfs": {
		"home_feed": {
			"default": {
			        // 设置为 myRecall
				"RecallNames": ["myRecall"]
			}
		}
	}

自定义 Sort 支持

和 filter 类似,sort 也使用同样的方式实现自定义

1.实现自定义 ISort 类, ISort 已经定义好

type ISort interface {   
    // 实现 sort 接口,数据在 sortData.Data 里
    Sort(sortData *SortData) error
 }
package sort

import (
	"fmt"

	"github.com/alibaba/pairec/v2/module"
	"github.com/alibaba/pairec/v2/sort"
)

type MySort struct {
}

func (s *MySort) Sort(data *sort.SortData) error {
	fmt.Println("my sort")
	items := data.Data.([]*module.Item)
	// sort 具体逻辑处理
	...
	data.Data = items
	return nil
}

2.在 server 启动前,注册自定义的 Sort

package main

import (
	"pairec_base/src/controller"
	sort2 "pairec_base/src/sort"

	"github.com/alibaba/pairec/v2"
	"github.com/alibaba/pairec/v2/sort"
)

func main() {
	pairec.AddStartHook(func() error {
		sort.RegisterSort("mySort", &sort2.MySort{})
		return nil
	})

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

3.修改配置,增加 Sort 的访问

"SortNames":   {"default":   ["mysort", "item_score"]}

提权操作自定义

很多情况下模型打分之后,用户需要自定义策略实现对模型得分的提降权操作,用户需要提供自定义的策略逻辑。需要实现boostFunc定义。

type boostFunc func(score float64, user *module.User, item *module.Item, context *context.RecommendContext) float64
package rank
import (
	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

// 用户自定义
func BoostScore(score float64, user *module.User, item *module.Item, context *context.RecommendContext) float64 {
	//编辑提权
	vTagId, err := item.IntProperty("vtag_id")
	if err == nil && vTagId == 20214 {
		return score * 1.3
	}
	return score
}

注册:

package main
import(
      prank "github.com/alibaba/pairec/v2/service/rank"
      "pairec_base/src/rank"
      "github.com/alibaba/pairec/v2"
)

func main(){
        pairec.AddStartHook(func() error {
            prank.SetBoostFunc(rank.BoostScore)
        })

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

特征自定义加载

特征加载过程,增加了自定义特征加载的支持。除了在配置中FeatureConfs中加载特征,也支持自定义的 function 加载特征。

加载特征 function 需要实现

type LoadFeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext)

通过

func RegisterLoadFeatureFunc(sceneName string, f LoadFeatureFunc) 

进行注册。这个是分场景注册。

例如:

package feature
import (
	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

func LoadRealTimeFeatures(user *module.User, items []*module.Item, context *context.RecommendContext) {
	// 向 user 增加
	user.AddProperty("userAge", 30)

	// 向 item 增加
	for _, item := range items {
		item.AddProperty("count", 5)
	}
}

注册:

package main
import(
      pfeature "github.com/alibaba/pairec/v2/service/feature"
      "github.com/alibaba/pairec/v2"
)

func main(){
       pairec.AddStartHook(func() error {
            // feed 是场景名称
            pfeature.RegisterLoadFeatureFunc("feed", feature.LoadRealTimeFeatures)
            return nil
        })

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

特征工程自定义处理

在特征加载完成之后,很多时候会做特征处理的工作,比如生成新的特征,生成组合特征,特征处理时,可能会有 user 和 item 特征的综合处理等等。在特征加载之后,引擎也提供一些预定义的特征处理算子用于特征工程,在不满足的情况下,可以自定义处理实现。

特征工程的自定义处理也是通过自定义的 function 来实现的。

特征工程 function 定义

type FeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item

通过

func RegisterFeatureFunc(sceneName string, f FeatureFunc) 

进行注册。这个也是分场景的注册。

举例:

定义一个特征处理函数 funciton

package feature
import (
	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

// 这里还可以决定进一步减少返回的 item 数量
func MyFeatureFunc(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item {
	if len(items) < 400 {
		return items
	}
  
  // 特征的处理
  
  return items[:400]
}

在 main.go 里进行注册

package main
import(
      pfeature "github.com/alibaba/pairec/v2/service/feature"
      "github.com/alibaba/pairec/v2"
)

func main(){
       pairec.AddStartHook(func() error {
            // feed 是场景名称
            pfeature.RegisterFeatureFunc("feed", feature.MyFeatureFunc)
            return nil
        })

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

自定义 Rank(算法调用)

rank 需要实现 IRank 接口才能注册到流程中。包含两部分,一个是过滤出想自定义调用算法的 item,然后对过滤出来的 item 调用自定义 rank function。

IRank 接口定义如下:

type IRank interface {
	// Filter the custom rank of item
	Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool

	Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext)
}

然后通过

func RegisterRank(sceneName string, ranks ...IRank)

根据每个场景注册不同的自定义的 rank。

说明

这里 ranks 可以设置多个,可以针对不同的 item,调用不同的 rank。

这里举个简单的例子说明:

type MyRank struct {
    index int
}

func NewMyRank() *MyRank {
    return &MyRank{
        index: 0,
    }
}
func (r *MyRank) Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool {
    r.index++

    item.AddProperty("other", "other")
    if r.index%2 == 0 {
        item.AddProperty("index", r.index)
        return true
    }
    return false
}

func (r *MyRank) Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext) {
    fmt.Println("rank len", len(items))
    for _, item := range items {
        if f, err := item.FloatProperty("index"); err == nil {
            item.Score = float64(f * 10)
        }
    }

    r.index = 0
}

如果没有匹配到自定义 Rank 的 item 列表,还是会调用 RankConf 里的模型配置。

完整示例

recall.go 自定义recall 示例

package recall

import (
	"fmt"
	"math/rand"

	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

type MyRecall struct {
}

func (r *MyRecall) GetCandidateItems(user *module.User, context *context.RecommendContext) []*module.Item {
	fmt.Println("MyRecall is running!")

	ret := make([]*module.Item, 0)

	for i := 1; i < 100; i++ {
		item := module.NewItem(fmt.Sprintf("item_%d", i))
		item.Score = rand.Float64()
		item.AddProperty("title", fmt.Sprintf("新闻%d", i))
		item.AddProperty("count", i)
		item.RetrieveId = "myRecall"
		ret = append(ret, item)
	}

	return ret
}

filter.go 自定义 filter 示例

package filter

import (
	"fmt"

	"github.com/alibaba/pairec/v2/filter"
	"github.com/alibaba/pairec/v2/module"
)

type MyFilter struct {
}

func (f *MyFilter) Filter(data *filter.FilterData) error {
	fmt.Println("MyFilter is running!")

	items := data.Data.([]*module.Item)
	newItems := make([]*module.Item, 0)

	for _, item := range items {
		if item.Score > 0.2 {
			newItems = append(newItems, item)
		}
	}

	fmt.Printf("MyFilter: kept %d items\n", len(newItems))
	data.Data = newItems
	return nil
}

rank.go 自定义 rank 示例 ,可以有很多不同的 rank ,rank 和 rank1 分别是不同的示例

package rank

import (
	"fmt"

	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

type MyRank struct {
}

func (rank *MyRank) Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool {
	//fmt.Println("MyRank Filter ")
	if item != nil && item.Score <= 0.5 {
		return true
	}
	return false
}
func (rank *MyRank) Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext) {
	fmt.Println("MyRank is running")

	for _, item := range items {
		item.Score = BoostScore(item.Score, User, item, context)
	}
}

rank1.go

package rank

import (
	"fmt"

	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

type MyRank1 struct{}

func (r *MyRank1) Filter(User *module.User, item *module.Item, context *context.RecommendContext) bool {
	//fmt.Println("MyRank1 Filter is running")
	if item != nil && 0.5 < item.Score && item.Score < 1 {
		return true
	}
	return false
}
func (r *MyRank1) Rank(User *module.User, items []*module.Item, requestData []map[string]interface{}, context *context.RecommendContext) {
	fmt.Println("MyRank1 is running")
	for _, item := range items {
		if item.Score >= 0.9 {
			item.Score *= 1.1
		} else if item.Score >= 0.8 {
			item.Score *= 1.2
		} else if item.Score >= 0.7 {
			item.Score *= 1.3
		} else {
			item.Score *= 1.4
		}
	}
}

boost.go 基于 rank 的 boost 示例(如果是自定义rank,需要自己手动调用,非自定义rank会自动调用)

package rank

import (
	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

func BoostScore(score float64, user *module.User, item *module.Item, ctx *context.RecommendContext) float64 {
	//fmt.Println("BoostScore is running!")
	return score * 1.5
}

feature.go 自定义 feature 示例

package feature

import (
	"fmt"

	"github.com/alibaba/pairec/v2/context"
	"github.com/alibaba/pairec/v2/module"
)

// LoadRealTimeFeatures 自定义特征加载函数
func LoadRealTimeFeatures(user *module.User, items []*module.Item, context *context.RecommendContext) {
	// 向 user 增加
	user.AddProperty("userAge", 30)

	// 向 item 增加
	for _, item := range items {
		item.AddProperty("count", 5)
	}
}

// MyFeatureFunc 自定义特征处理
func MyFeatureFunc(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item {
	fmt.Println("MyFeatureFunc is running")
	if len(items) < context.Size {
		fmt.Printf("items less size :%d \n", len(items))
		return items
	}
	item2 := make([]*module.Item, len(items), 0)
	for _, item := range items {
		if v, ok := item.Properties["userAge"]; ok {
			if age, o := v.(int); o && age > 18 {
				item2 = append(item2, item)
			}
		}

	}
	if len(item2) < context.Size {
		fmt.Printf("item2 : %d \n", len(item2))
		return item2
	} else {
		fmt.Printf("items[:context.Size] : %d \n", len(items))
		return items[:context.Size]
	}
}

main.go 主函数,注册各种自定义

package main

import (
	"pairec_base/src/controller"
	feature2 "pairec_base/src/feature"
	filter2 "pairec_base/src/filter"
	rank2 "pairec_base/src/rank"
	recall2 "pairec_base/src/recall"
	sort2 "pairec_base/src/sort"

	"github.com/alibaba/pairec/v2"
	"github.com/alibaba/pairec/v2/filter"
	"github.com/alibaba/pairec/v2/service/feature"
	"github.com/alibaba/pairec/v2/service/rank"
	"github.com/alibaba/pairec/v2/service/recall"
	"github.com/alibaba/pairec/v2/sort"
)

func main() {
	pairec.AddStartHook(func() error {
		recall.RegisterRecall("myRecall", &recall2.MyRecall{})
		filter.RegisterFilter("myFilter", &filter2.MyFilter{})
		sort.RegisterSort("mySort", &sort2.MySort{})
		rank.RegisterRank("home_feed", &rank2.MyRank{}, &rank2.MyRank1{})
		rank.SetBoostFunc(rank2.BoostScore)
		feature.RegisterLoadFeatureFunc("home_feed", feature2.LoadRealTimeFeatures)
		feature.RegisterFeatureFunc("home_feed", feature2.MyFeatureFunc)
		return nil
	})

	pairec.Route("/api/rec/feed", &controller.FeedController{})
	pairec.Run()
}

config.json 配置示例

{
	"RunMode": "product",
	"ListenConf": {
	  "HttpAddr": "",
	  "HttpPort": 8000
	},
	"ABTestConf": {
	  "Host": "",
	  "Token": ""
	},
	"FilterConfs": [
	],
	"RecallConfs": [
		{
			"Name": "mock_recall",
			"RecallType": "MockRecall",
			"RecallCount": 200
		}
	],
	"SortNames": {
	  "default": [
		  "mySort"
	  ]
	},
	"FilterNames": {
	  "default": [
		"myFilter"
	  ]
	},
	"AlgoConfs": [
	],
	"KafkaConfs": {
	},
	"RedisConfs": {
	},
	"SceneConfs": {
		"home_feed": {
			"default": {
				"RecallNames": ["myRecall"],
				"FilterNames": ["myFilter"],
				"SortName":["mySort"]
			}
		}
	},
	"LogConf": {
	  "RetensionDays": 3,
	  "DiskSize": 20,
	  "LogLevel": "INFO"
	},
	"RankConf": {
	},
	"FeatureConfs": {
	}
}