自定义特征算子能够以插件的形式被框架动态加载并执行。FG框架尽量保持轻量,仅内置少量常用的特征算子,以节省编译时间、服务资源,加快服务启动速度。
配置
{
"feature_name": "my_custom_fg_op",
"feature_type": "custom_feature",
"operator_name": "EditDistance",
"operator_lib_file": "libedit_distance.so",
"expression": [
"user:query",
"item:title"
],
"value_type": "string",
"separator": ",",
"default_value": "-1",
"value_dimension": 1,
"normalizer": "method=expression,expr=x>16?16:x",
"num_buckets": 10000,
"stub_type": false,
"is_sequence": false,
...
}
除了上述配置项,用户可以根据需要添加其他配置项,当前配置的JSON字符串会传递给自定义算子。
配置项 | 说明 |
feature_type | 固定为 |
operator_name | 特征算子注册的名字,建议与实现的类名保持一致;同一 |
operator_lib_file | 指定特征算子动态库文件的名称,必须以
|
expression | 输入表达式,支持多输入。 |
value_type | 特征变换的输出类型,只能是基础类型 |
default_value | 特征默认值,统一以字符串格式配置。代码中自行转换为需要的类型。 |
separator | 分隔符,用来split配置的 |
value_dimension | 输出特征的维度,可以用来截断离线任务的输出结果,影响输出表的schema;如果是多值特征且输出维度不确定,可以不添加该配置。 可选项,默认值为0,可以在离线任务中用来截断输出;值为1时输出表的schema类型为 |
stub_type | 表示当前特征算子是否只能够作为特征变换的中间结果,设置为 |
is_sequence | 标记是否是序列特征。 |
分箱操作 | 支持5种类型的分箱操作,用户无需自己实现分箱操作。详情请参见特征分箱(离散化)。
|
normalizer | 针对数值型特征,可以添加该配置对变换结果进一步处理,如计算一个表达式的值。 支持的操作符与函数,请参见内置特征算子。支持minmax、zscore、log10、expression共4种框架,配置和计算方法如下:
|
C++接口
#pragma once
#ifndef FEATURE_GENERATOR_PLUGIN_BASE_H
#define FEATURE_GENERATOR_PLUGIN_BASE_H
#include <absl/container/flat_hash_map.h>
#include <absl/strings/string_view.h>
#include <absl/types/optional.h>
#include <map>
#include <stdexcept>
#include <unordered_map>
#include <utility>
#include <vector>
#include "fsmap.h"
#include "integral_types.h"
namespace fg {
using absl::optional;
using std::string;
using std::vector;
template <typename T>
using List = std::vector<T>;
template <typename K, typename V>
using Map = absl::flat_hash_map<K, V>;
template <typename K, typename V>
using MapArray = std::vector<std::pair<K, V>>;
using Matrix = std::vector<std::vector<float>>;
using MatrixL = std::vector<std::vector<int64>>;
using MatrixS = std::vector<std::vector<string>>;
template <typename K, typename V>
using FSMap = featurestore::type::fs_map<K, V>;
using Field = absl::variant<
optional<string>, optional<int32>, optional<int64>, optional<float>,
optional<double>, optional<absl::string_view>,
List<string>, List<int32>, List<int64>, List<float>, List<double>,
List<absl::string_view>,
Map<string, string>, Map<string, int32>, Map<string, int64>,
Map<string, float>, Map<string, double>, Map<string, absl::string_view>,
Map<absl::string_view, absl::string_view>, Map<absl::string_view, int32>,
Map<absl::string_view, int64>, Map<absl::string_view, float>,
Map<absl::string_view, double>, Map<absl::string_view, string>,
Map<int32, string>, Map<int32, int32>, Map<int32, int64>, Map<int32, float>,
Map<int32, double>, Map<int32, absl::string_view>,
Map<int64, string>, Map<int64, float>, Map<int64, double>,
Map<int64, int32>, Map<int64, int64>, Map<int64, absl::string_view>,
FSMap<absl::string_view, absl::string_view>,
FSMap<absl::string_view, int32>, FSMap<absl::string_view, int64>,
FSMap<absl::string_view, float>, FSMap<absl::string_view, double>,
FSMap<int32, int32>, FSMap<int32, int64>, FSMap<int32, float>,
FSMap<int32, double>, FSMap<int32, absl::string_view>,
FSMap<int64, float>, FSMap<int64, double>, FSMap<int64, int32>,
FSMap<int64, int64>, FSMap<int64, absl::string_view>,
MapArray<string, string>, MapArray<string, int32>, MapArray<string, int64>,
MapArray<string, float>, MapArray<string, double>,
MapArray<int32, string>, MapArray<int32, float>, MapArray<int32, double>,
MapArray<int32, int32>, MapArray<int32, int64>,
MapArray<int64, string>, MapArray<int64, float>, MapArray<int64, double>,
MapArray<int64, int32>, MapArray<int64, int64>, Matrix, MatrixL, MatrixS>;
// represents a COLUMN of the feature table
using VariantVector = absl::variant<
vector<optional<string>>, vector<optional<int32>>, vector<optional<int64>>,
vector<optional<float>>, vector<optional<double>>,
vector<optional<absl::string_view>>,
vector<List<string>>, vector<List<int32>>, vector<List<int64>>,
vector<List<float>>, vector<List<double>>, vector<List<absl::string_view>>,
vector<Map<string, string>>, vector<Map<string, int32>>,
vector<Map<string, int64>>, vector<Map<string, float>>,
vector<Map<string, double>>, vector<Map<string, absl::string_view>>,
vector<Map<absl::string_view, absl::string_view>>,
vector<Map<absl::string_view, int32>>,
vector<Map<absl::string_view, int64>>,
vector<Map<absl::string_view, float>>,
vector<Map<absl::string_view, double>>,
vector<Map<int32, string>>, vector<Map<int32, int32>>,
vector<Map<int32, int64>>, vector<Map<int32, float>>,
vector<Map<int32, double>>, vector<Map<int32, absl::string_view>>,
vector<Map<int64, string>>, vector<Map<int64, float>>,
vector<Map<int64, double>>, vector<Map<int64, int32>>,
vector<Map<int64, int64>>, vector<Map<int64, absl::string_view>>,
vector<FSMap<absl::string_view, absl::string_view>>,
vector<FSMap<absl::string_view, int32>>,
vector<FSMap<absl::string_view, int64>>,
vector<FSMap<absl::string_view, float>>,
vector<FSMap<absl::string_view, double>>,
vector<FSMap<int32, int32>>, vector<FSMap<int32, int64>>,
vector<FSMap<int32, float>>, vector<FSMap<int32, double>>,
vector<FSMap<int32, absl::string_view>>,
vector<FSMap<int64, float>>, vector<FSMap<int64, double>>,
vector<FSMap<int64, int32>>, vector<FSMap<int64, int64>>,
vector<FSMap<int64, absl::string_view>>,
vector<MapArray<string, string>>, vector<MapArray<string, int32>>,
vector<MapArray<string, int64>>, vector<MapArray<string, float>>,
vector<MapArray<string, double>>,
vector<MapArray<int32, string>>, vector<MapArray<int32, float>>,
vector<MapArray<int32, double>>, vector<MapArray<int32, int32>>,
vector<MapArray<int32, int64>>,
vector<MapArray<int64, string>>, vector<MapArray<int64, float>>,
vector<MapArray<int64, double>>, vector<MapArray<int64, int32>>,
vector<MapArray<int64, int64>>, vector<Matrix>, vector<MatrixL>,
vector<MatrixS>>;
/**
* @brief 自定义特征算子的公共基类
*
* 框架会检测子类有没有override批量接口`BatchProcess`方法,如果有实现则会调用该方法完成特征变换;
* 否则,框架根据`value_type`的配置从一下`ProcessWith*`方法中选择一个执行,用户必须实现其中一个对应类型的接口
*/
class IFeatureOP {
public:
class NotOverriddenException : public std::exception {
public:
NotOverriddenException(const std::string& msg) : msg_(msg) {}
const char* what() const noexcept override {
if (msg_.empty()) {
return "unimplemented method called";
}
std::string msg = "unimplemented method called: " + msg_;
return msg.c_str();
}
private:
const std::string msg_;
};
virtual ~IFeatureOP() = default;
/**
* @brief 初始化方法
* @param feature_config is a json string,
* @return 如果为0,则表示模型加载成功,否则表示模型加载失败。
*/
virtual int Initialize(const string& feature_config) = 0;
/**
* @brief 特征变换,输出为string类型
* @param inputs 表示一条记录,可以有多个字段(field)
* @param outputs 特征变换的输出
* @return 状态码,如果为0表示执行成功
*/
virtual int ProcessWithStrOutputs(const vector<Field>& inputs,
vector<string>& outputs) {
throw NotOverriddenException("ProcessWithStrOutputs");
}
/**
* @brief 特征变换,输出为int32类型
* @param inputs 表示一条记录,可以有多个字段(field)
* @param outputs 特征变换的输出
* @return 状态码,如果为0表示执行成功
*/
virtual int ProcessWithInt32Outputs(const vector<Field>& inputs,
vector<int32>& outputs) {
throw NotOverriddenException("ProcessWithInt32Outputs");
}
/**
* @brief 特征变换,输出为int64类型
* @param inputs 表示一条记录,可以有多个字段(field)
* @param outputs 特征变换的输出
* @return 状态码,如果为0表示执行成功
*/
virtual int ProcessWithInt64Outputs(const vector<Field>& inputs,
vector<int64>& outputs) {
throw NotOverriddenException("ProcessWithInt64Outputs");
}
/**
* @brief 特征变换,输出为float类型
* @param inputs 表示一条记录,可以有多个字段(field)
* @param outputs 特征变换的输出
* @return 状态码,如果为0表示执行成功
*/
virtual int ProcessWithFloatOutputs(const vector<Field>& inputs,
vector<float>& outputs) {
throw NotOverriddenException("ProcessWithFloatOutputs");
}
/**
* @brief 特征变换,输出为double类型
* @param inputs 表示一条记录,可以有多个字段(field)
* @param outputs 特征变换的输出
* @return 状态码,如果为0表示执行成功
*/
virtual int ProcessWithDoubleOutputs(const vector<Field>& inputs,
vector<double>& outputs) {
throw NotOverriddenException("ProcessWithDoubleOutputs");
}
/**
* @brief 可选,处理多个records的批量接口
*
* @param inputs 输入column的vector,VariantVector表示一个特征column
* @param outputs
* 输出,变换后的特征;支持输出复杂类型,可作为其他的特征变换的输入
* @return 状态码,如果为0表示执行成功
*/
virtual int BatchProcess(const vector<VariantVector>& inputs,
VariantVector& outputs) {
throw NotOverriddenException("BatchProcess");
}
};
using CreatePluginFunc = IFeatureOP* (*)();
class PluginRegistry {
public:
static void registerPlugin(const std::string& name, CreatePluginFunc func);
static CreatePluginFunc getCreateFunc(const std::string& name);
static std::map<std::string, CreatePluginFunc>& getRegistry();
};
} // namespace fg
#define REGISTER_PLUGIN(PluginName, PluginClass) \
extern "C" fg::IFeatureOP* create##PluginClass() { \
return new fg::PluginClass(); \
} \
struct PluginClass##Registrar { \
PluginClass##Registrar() { \
fg::PluginRegistry::registerPlugin(PluginName, create##PluginClass); \
} \
}; \
static PluginClass##Registrar PluginClass##_registrar;
#endif // FEATURE_GENERATOR_PLUGIN_BASE_H
开发指南
下载依赖API代码文件fg-api.tar.gz,包含必要的头文件等。
您需要继承基类
IFeatureOP
,实现Initialize
方法,同时至少实现一个ProcessWith*
方法。您的实现类必须包含一个无参构造函数。
框架会把配置的JSON字符串传递给
Initialize
方法,您自行解析需要的配置项。框架会根据
value_type
配置项调用对应的ProcessWith*
方法,如果您未实现对应类型的方法,会抛运行时异常。ProcessWith*
方法仅需要处理一条记录,可以有多个输入field,也可有多维输出(比如多值特征)。VariantRecord
定义了所有可以被框架处理的特征field类型。您的代码需要尽可能支持每种类型,即对每种可能的输入类型实现相应的特征变换操作,除非确定某些类型确实不需要用到,这种情况可直接抛异常。
FSMAP是使用
featurestore
时需要支持的类型,可大幅提高Processor的性能。
您仅需要实现分箱操作前的特征变换操作,如果有配置分箱操作,框架会自动执行分箱操作。
您需要使用
REGISTER_PLUGIN
宏注册新开发的特征OP,否则框架无法使用。REGISTER_PLUGIN("OperatorName", OperatorClass);两个宏参数根据需要替换,建议保持一致
配置项中的
operator_name
就是这里的"OperatorName",需要保持一致。
框架会扫描一个指定目录下的所有动态库,并在必要时尝试加载其中需要用到的特征算子。
通过环境变量
FEATURE_OPERATOR_DIR
指定动态库文件所在的目录。每个动态库里可以包含多个特征算子的实现。
依赖的三方库
abseil-cpp.tar.gz(推荐使用与FG框架相同的版本)
序列特征
如果配置项is_sequence
设定为true,有以下注意事项:
稀疏特征序列
当算子生成的是一个稀疏特征序列,如历史访问过的
item_id
的序列,并且序列的每个元素都是单值,此时可以输出任意类型。当算子生成的是一个稀疏特征序列,并且序列的每个元素可能是多值,此时只能输出string类型(value_type必须设为string),多值使用分隔符chr(29)隔开。
稠密特征序列
当算子生成的是一个稀疏特征序列,如历史访问过的物品的embedding向量,此时需要配置
value_dimension
,值为序列的每个元素的维度。序列的元素是标量(scalar)时,
value_dimension
设定为1。序列的元素是向量(vector)时,
value_dimension
设定为向量的长度。算子输出的特征值数量必须是
value_dimension
的整数倍。
开发示例
算子名称 | 算子功能 | 源码下载链接 | 二进制包下载链接 |
EditDistance | 编辑距离 | ||
RegexReplace | 正则替换 | ||
BPETokenize | BPE分词 | 已包含在内置tokenize_feature中 |
下面以计算两个输入文本的编辑距离举例,头文件edit_distance.h。
#pragma once
#include "api/base_op.h"
namespace fg {
namespace functor {
class EditDistanceFunctor;
}
using std::string;
using std::vector;
/**
* @brief 编辑距离:输入两个字符串,输出是它们的文本编辑距离
*/
class EditDistance : public IFeatureOP {
public:
int Initialize(const string& feature_config) override;
/// @return 状态码,如果为0表示执行成功
int ProcessWithStrOutputs(const vector<Field>& inputs,
vector<string>& outputs) override;
/// @return 状态码,如果为0表示执行成功
int ProcessWithInt32Outputs(const vector<Field>& inputs,
vector<int32>& outputs) override;
/// @return 状态码,如果为0表示执行成功
int ProcessWithInt64Outputs(const vector<Field>& inputs,
vector<int64>& outputs) override;
/// @return 状态码,如果为0表示执行成功
int ProcessWithFloatOutputs(const vector<Field>& inputs,
vector<float>& outputs) override;
/// @return 状态码,如果为0表示执行成功
int ProcessWithDoubleOutputs(const vector<Field>& inputs,
vector<double>& outputs) override;
private:
string feature_name_;
std::unique_ptr<functor::EditDistanceFunctor> functor_p_;
};
} // end of namespace fg
REGISTER_PLUGIN("EditDistance", EditDistance);
实现文件edit_distance.cc。
#include "edit_distance.h"
#include <absl/strings/ascii.h>
#include <absl/strings/str_join.h>
#include <codecvt>
#include <nlohmann/json.hpp>
#include <numeric> // 包含 std::iota
#include <stdexcept>
#include "log.h"
namespace fg {
using absl::optional;
namespace functor {
template <class T>
int edit_distance(T s1, T s2) {
int l1 = s1.size();
int l2 = s2.size();
if (l1 * l2 == 0) {
return l1 + l2;
}
vector<int> prev(l2 + 1);
vector<int> curr(l2 + 1);
std::iota(prev.begin(), prev.end(), 0);
for (int i = 0; i <= l1; ++i) {
curr[0] = i;
for (int j = 1; j <= l2; ++j) {
int d = prev[j - 1];
if (s1[i - 1] == s2[j - 1]) {
curr[j] = d;
} else {
int d2 = std::min(prev[j], curr[j - 1]);
curr[j] = 1 + std::min(d, d2);
}
}
prev.swap(curr);
}
return prev[l2];
}
enum class Encoding : unsigned int { Latin = 0, UTF8 = 1 };
class EditDistanceFunctor {
public:
EditDistanceFunctor(const string& encoding) {
string enc = absl::AsciiStrToLower(encoding);
if (enc == "utf-8" || enc == "utf8") {
encoding_ = Encoding::UTF8;
} else {
encoding_ = Encoding::Latin;
}
}
int operator()(absl::string_view s1, absl::string_view s2) {
if (encoding_ == Encoding::Latin) {
return edit_distance(s1, s2);
}
if (encoding_ == Encoding::UTF8) {
std::wstring str1 = from_bytes(s1);
std::wstring str2 = from_bytes(s2);
std::wstring& ws1 = str1;
std::wstring& ws2 = str2;
return edit_distance(ws1, ws2);
}
LOG(ERROR) << "EditDistanceFunctor found unsupport text encoding";
assert(false);
return 0;
}
const Encoding TextEncoding() const { return encoding_; }
private:
Encoding encoding_;
std::wstring from_bytes(absl::string_view str) {
std::wstring result;
int i = 0;
int len = (int)str.length();
while (i < len) {
int char_size = 0;
int unicode = 0;
if ((str[i] & 0x80) == 0) {
unicode = str[i];
char_size = 1;
} else if ((str[i] & 0xE0) == 0xC0) {
unicode = str[i] & 0x1F;
char_size = 2;
} else if ((str[i] & 0xF0) == 0xE0) {
unicode = str[i] & 0x0F;
char_size = 3;
} else if ((str[i] & 0xF8) == 0xF0) {
unicode = str[i] & 0x07;
char_size = 4;
} else {
// Invalid UTF-8 sequence
++i;
continue;
}
for (int j = 1; j < char_size; ++j) {
unicode = (unicode << 6) | (str[i + j] & 0x3F);
}
if (unicode <= 0xFFFF) {
result += static_cast<wchar_t>(unicode);
} else {
// Handle surrogate pairs for characters outside the BMP
unicode -= 0x10000;
result += static_cast<wchar_t>((unicode >> 10) + 0xD800);
result += static_cast<wchar_t>((unicode & 0x3FF) + 0xDC00);
}
i += char_size;
}
return result;
}
};
} // namespace functor
// 定义 overloaded 类
template <class... Ts>
struct overloaded : Ts... {
using Ts::operator()...;
};
// 类模板参数推导指引(C++17)
template <class... Ts>
overloaded(Ts...) -> overloaded<Ts...>;
int EditDistance::Initialize(const string& feature_config) {
nlohmann::json cfg;
try {
cfg = nlohmann::json::parse(feature_config);
} catch (nlohmann::json::parse_error& ex) {
LOG(ERROR) << "parse error at byte " << ex.byte;
LOG(ERROR) << "config: " << feature_config;
throw std::runtime_error("parse EditDistance config failed");
}
feature_name_ = cfg.at("feature_name");
string encoding = cfg.value("encoding", "latin");
functor_p_ = std::make_unique<functor::EditDistanceFunctor>(encoding);
functor::Encoding enc = functor_p_->TextEncoding();
encoding = (enc == functor::Encoding::UTF8) ? "UTF-8" : "Latin";
LOG(INFO) << feature_name_ << " with text encoding: " << encoding;
return 0;
}
int EditDistance::ProcessWithInt32Outputs(const vector<Field>& inputs,
vector<int32>& outputs) {
outputs.clear();
if (inputs.size() < 2) {
outputs.push_back(0);
return -1; // invalid inputs
}
int d = absl::visit(
overloaded{
[this](const optional<string>& s1, const optional<string>& s2) {
return functor_p_->operator()(s1.value_or(""), s2.value_or(""));
},
[this](const optional<absl::string_view>& s1,
const optional<absl::string_view>& s2) {
return functor_p_->operator()(s1.value_or(""), s2.value_or(""));
},
[this](const optional<absl::string_view>& s1,
const optional<string>& s2) {
return functor_p_->operator()(s1.value_or(""), s2.value_or(""));
},
[this](const optional<string>& s1,
const optional<absl::string_view>& s2) {
return functor_p_->operator()(s1.value_or(""), s2.value_or(""));
},
[this](const List<string>& s1, const List<string>& s2) {
string str1 = absl::StrJoin(s1, "");
string str2 = absl::StrJoin(s2, "");
return functor_p_->operator()(str1, str2);
},
[this](const List<absl::string_view>& s1,
const List<absl::string_view>& s2) {
string str1 = absl::StrJoin(s1, "");
string str2 = absl::StrJoin(s2, "");
return functor_p_->operator()(str1, str2);
},
[this](const auto& x, const auto& y) {
ERROR_EXIT(feature_name_,
"unsupported input type: ", typeid(x).name(), " vs ",
typeid(y).name());
return 0;
}},
inputs.at(0), inputs.at(1));
outputs.push_back(d);
return 0;
}
int EditDistance::ProcessWithInt64Outputs(const vector<Field>& inputs,
vector<int64>& outputs) {
vector<int32> distances;
int status = ProcessWithInt32Outputs(inputs, distances);
if (0 != status) {
return status;
}
outputs.clear();
outputs.insert(outputs.end(), distances.begin(), distances.end());
return 0;
}
int EditDistance::ProcessWithFloatOutputs(const vector<Field>& inputs,
vector<float>& outputs) {
vector<int32> distances;
int status = ProcessWithInt32Outputs(inputs, distances);
if (0 != status) {
return status;
}
outputs.clear();
outputs.insert(outputs.end(), distances.begin(), distances.end());
return 0;
}
int EditDistance::ProcessWithDoubleOutputs(const vector<Field>& inputs,
vector<double>& outputs) {
vector<int32> distances;
int status = ProcessWithInt32Outputs(inputs, distances);
if (0 != status) {
return status;
}
outputs.clear();
outputs.insert(outputs.end(), distances.begin(), distances.end());
return 0;
}
int EditDistance::ProcessWithStrOutputs(const vector<Field>& inputs,
vector<string>& outputs) {
vector<int32> distances;
int status = ProcessWithInt32Outputs(inputs, distances);
if (0 != status) {
return status;
}
outputs.clear();
outputs.reserve(distances.size());
std::transform(distances.begin(), distances.end(),
std::back_inserter(outputs),
[](int32& x) { return std::to_string(x); });
return 0;
}
} // end of namespace fg
执行build.sh
脚本编译生成FG算子。
编译自定义算子
需要与FG框架保持相同的编译环境,比如语言标准(C++17)、编译选项等。
具体可以查看开发示例中的CMakeLists.txt文件。