阿里云DataWorks数据集成推出Embedding向量化功能,支持将分散在OSS、MaxCompute、HDFS等异构来源中的数据抽取并转化为向量,写入Milvus、Elasticsearch、Opensearch等向量库,能够大幅简化ETL流程,高效实现知识向量化,助力RAG等AI场景落地。
为什么使用 Embedding
随着大语言模型(LLM)技术的持续演进,对企业而言,将私有知识深度整合至模型体系,是实现其在实际业务场景中有效落地并创造价值的关键。检索增强生成(RAG)由此成为重要的技术路径之一,通过将数据编码为向量表示,并结合向量数据库进行高效检索,为大模型提供精准、权威且动态更新的领域知识支持。
您的业务数据可能散落在OSS、MaxCompute、HDFS、MySQL、Oracle 或消息队列等众多异构数据源中,需要将这些异构数据源的数据做好Embedding向量化,然后将向量化的结果再写入各种异构的向量化数据库中(比如 Milvus、OpenSearch、 ElasticSearch等)。此过程不仅需要编写繁琐的ETL脚本,还需要适配各种源端数据源类型,数据将历经抽取、转换、向量化(Embedding)及写入等多个阶段,流程链路长且耦合度高,导致模型迭代周期显著延长。
功能介绍
该功能正在灰度邀测中,如有需要,请提交工单联系技术支持人员。
DataWorks数据集成支持Embedding向量化功能,在同一条数据通道内一站式完成抽取、向量化与写入向量数据库,实现端到端的自动化处理。该能力显著降低开发复杂度,缩短知识更新延迟,助力RAG、智能客服、搜索推荐等场景实现高效的知识接入。
数据集成离线同步数据Embedding向量化同步能力,支持两种配置模式:
使用限制
仅启用了新版数据开发的工作空间可用。
仅支持使用Serverless资源组。
前提条件
已创建启用了新版数据开发的工作空间。
已创建Serverless资源组并绑定至工作空间。
已创建离线同步任务所需的数据来源与数据去向数据源。
本教程以MaxCompute同步至Milvus为例,因此需要先创建MaxCompute数据源和Milvus数据源。
测试数据准备
本教程演示的表数据来自公开数据集(电商网站商品评论情感预测数据集),将商品的用户评论做向量化处理,然后同步到Milvus中进行后续的相似度检索。
数据来源MaxCompute侧:创建测试表并插入测试数据。
数据去向Milvus侧:创建目标表用于接收向量化后的数据。表结构如下:
目标表特性为自动ID表。
字段名
类型
描述
id
Int64
主键,自动递增。
sentence
VarChar(32)
存储原始文本。
sentence_e
FloatVector(128)
向量字段,用于相似性搜索,使用COSINE度量。
向导模式配置
本教程以源端MaxCompute(ODPS)将数据读取并做Embedding向量化后同步到Milvus为例,介绍如何在数据集成中使用向导模式配置离线同步任务。
一、新建离线同步节点
进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的 ,进入Data Studio。
在项目目录单击
,配置数据来源与去向(本教程来源为MaxCompute、去向为Milvus)、节点名称,单击确认。
二、配置离线同步任务
配置基本信息。
数据源:选择数据来源和数据去向对应的数据源。
运行资源:执行离线同步任务所使用的资源组,选择当前工作空间已绑定且与数据源连通的资源组。
如果无可用的数据源和运行资源,请确保已完成前提条件。
配置数据来源。
以下为本教程MaxCompute数据源需配置的关键参数,如果使用其他数据来源,配置可能存在差异,请根据实际情况配置。
参数
说明
Tunnel资源组
Tunnel Quota,默认选择
公共传输资源
,即MaxCompute的免费quota。MaxCompute的数据传输资源选择,具体请购买与使用独享数据传输服务资源组。重要如果独享Tunnel Quota因欠费或到期不可用,任务在运行中将会自动切换为
公共传输资源
。表
选择待同步的数据来源表。
如果无可选的来源表,请确保已完成测试数据准备。
过滤方式
支持分区过滤和数据过滤:
如果来源表是分区表,您可以按分区选择同步数据的范围。
如果来源表是非分区表,您可以设置
WHERE
过滤语句,选择同步数据的范围。
您可以单击数据预览,查看配置是否正确。
配置数据处理。
打开数据处理开关,然后在数据处理列表中单击
,添加数据向量化处理节点。配置数据向量化节点。
关键参数解释如下:
说明数据向量化节点性能取决于配置的模型性能,阿里云百炼平台提供的QWen模型有QPS限制,阿里云PAI模型市场需要您自己在PAI EAS上部署模型,其性能表现取决于部署模型使用的资源规格。
在参数确定的情况下,Embedding模型生成的向量结果是确定性的。因此DataWorks数据集成在同步的过程中针对相同的向量化原始数据进行了LFU Cache(Least Frequently Used)优化,避免相同的数据重复调用Embedding模型,以提升处理性能,降低Embedding成本。
参数
描述
模型提供商
大模型提供商,目前支持如下模型提供商:
阿里云百炼平台,支持QWen模型。
阿里云PAI模型市场,支持BGE-M3模型。
模型名称
Embedding模型名称,按需选择。
模型 API Key
访问模型的API KEY,请前往模型提供商获取。
阿里云百炼平台:获取百炼API Key。
阿里云PAI模型市场:前往部署的EAS任务,进入在线调试,获取Headers Authorization的参数值,将其作为API KEY填写到此处。
模型 Endpoint
模型提供商选择阿里云PAI模型市场时,需要配置访问模型的访问点(Endpoint API地址)。
批处理大小
向量化批处理大小,取决于Embedding模型是否支持批处理,批处理有助于提升Embedding性能,降低Embedding成本。默认为10。
选择需要向量化的字段
定义需要将哪些列进行向量化,指定向量化后输出的字段名,数据集成支持对来源单个字段或者多个字段拼接组合做向量化。
向量化结果字段
源表来源字段向量化后定义的向量化字段名称。
向量维度
输出的向量维度,配置的Embedding模型必须支持定义的向量维度。默认为1024。
NULL值转为空字符串
由于大模型做向量化时不允许传入的数据为NULL,因此如果源表数据存在NULL,支持将其转为空字符串处理,避免向量化异常。默认不勾选。
是否拼接字段名称
做向量化时,是否需要拼接字段名称到文本中一起做向量化。选中时,还需要配置字段名拼接符。默认不勾选。
是否跳过空值字段
多个字段拼接进行向量化时,是否要跳过空值字段。默认选中,跳过空值字段。
数据输出预览。
您可以单击数据向量化节点配置区域右上角的数据输出预览,然后预览向量化后的结果,确认配置是否正确。
您也可以单击离线同步编辑页面顶部的模拟运行,预览向量化后的结果。
配置数据去向。
以下为本教程Milvus数据源需配置的关键参数,如果使用其他数据来源,配置可能存在差异,请根据实际情况配置。
参数
说明
集合
用于接收向量数据的Collection。
分区键
可选,如果Collection配置了分区,您可以为接收的向量数据指定分区。
写入模式
upsert:
当表未设置自动ID时:根据主键更新 Collection 中的某个 Entity。
当表设置自动ID时:将 Entity 中的主键替换为自动生成的主键,并插入数据。
insert:多用于自动ID表插入数据,milvus自动生成主键。
非自动ID表使用
insert
会导致数据重复。
配置去向字段映射。
当配置完数据来源、数据处理和数据去向后,离线同步将自动生成字段映射关系,由于目标端为无固定结构数据源,默认按同行映射执行,因此,需要单击来源字段或目标字段后的编辑,调整字段映射顺序,或者删减不需要的字段,以确保映射关系正确。
例如,本教程手动删除不需要映射的字段,调整后的映射关系如下。
更多高级配置。
单击节点配置页面右侧的高级配置,你可以按需设置任务并发数、同步速率、脏数据策略等参数。
三、调试运行
单击离线同步节点编辑页面右侧的调试配置,设置调试运行使用的资源组和脚本参数,然后单击顶部工具栏的运行,测试同步链路是否成功运行。
您可以前往Milvus侧,查看数据去向Collection中的数据是否符合预期。
四、调度配置与发布
单击离线同步任务右侧的调度配置,设置周期运行所需的调度配置参数后,单击顶部工具栏的发布,进入发布面板,根据页面提示完成发布。
脚本模式配置
本教程以源端MaxCompute(ODPS)将数据读取并做Embedding向量化后同步到Milvus为例,介绍如何在数据集成中使用脚本模式配置离线同步任务。
一、新建离线同步节点
进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的 ,进入Data Studio。
在项目目录单击
,配置数据来源与去向(本教程来源为MaxCompute、去向为Milvus)、节点名称,单击确认。
二、配置同步脚本
单击离线同步节点顶部工具栏的
,切换为脚本编辑模式。
配置MaxCompute到Milvus的离线同步任务。
请根据附录:脚本模式格式说明,配置离线同步任务的JSON脚本。本示例配置的脚本如下:
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "odps", "parameter": { "partition": [ "split=dev" ], "datasource": "MaxCompute_Source", "successOnNoPartition": true, "tunnelQuota": "default", "column": [ "sentence" ], "enableWhere": false, "table": "test_tb" }, "name": "Reader", "category": "reader" }, { "category": "flatmap", "stepType": "embedding-transformer", "parameter": { "modelProvider": "bailian", "modelName": "text-embedding-v4", "embeddingColumns": { "sourceColumnNames": [ "sentence" ], "embeddingColumnName": "sentence_e" }, "apiKey": "sk-****", "dimension": 128, "nullAsEmptyString": true }, "displayName": "sentence2emb", "description": "" }, { "stepType": "milvus", "parameter": { "schemaCreateMode": "ignore", "enableDynamicSchema": true, "datasource": "Milvus_Source", "column": [ { "name": "sentence", "type": "VarChar", "elementType": "None", "maxLength": "32" }, { "name": "sentence_e", "type": "FloatVector", "dimension": "128", "elementType": "None", "maxLength": "65535" } ], "writeMode": "insert", "collection": "Milvus_Collection", "batchSize": 1024, "columnMapping": [ { "sourceColName": "sentence", "dstColName": "sentence" }, { "sourceColName": "sentence_e", "dstColName": "sentence_e" } ] }, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader与Writer部分的参数解释,请参见MaxCompute数据源和Milvus数据源。
如果您使用其他类型数据来源和去向,请参见数据源列表。
数据向量化处理节点脚本部分各参数说明如下:
参数
描述
是否必填
modelProvider
大模型提供商,目前支持如下模型提供商:
bailian:阿里云百炼平台,支持QWen模型。
paiModelGallery:阿里云PAI模型市场,支持BGE-M3模型。
是
modelName
Embedding模型名称。
modelProvider为bailian时,可选择
text-embedding-v4
、text-embedding-v3
。modelProvider为paiModelGallery时,可选择
bge-m3
。
是
apiKey
访问模型的API KEY,请前往模型提供商获取。
是
endpoint
modelProvider为paiModelGallery时,需要配置访问模型的访问点(Endpoint API地址)。
否
batchSize
向量化批处理大小,取决于Embedding模型是否支持批处理,批处理有助于提升Embedding性能,降低Embedding成本。默认为10。
否
embeddingColumns
定义需要将哪些列进行向量化,指定向量化后输出的字段名,数据集成支持对来源单个字段或者多个字段拼接组合做向量化。
例如:
{ "embeddingColumns": { "sourceColumnNames": [ "col1", "col2" ], "embeddingColumnName": "my_vector" } }
是
appendDelimiter
将多个字段值拼接为一个文本进行向量化时的拼接符。默认值
\n
。否
skipEmptyValue
多个字段拼接进行向量化时,是否要跳过空值字段。默认False。
否
dimension
输出的向量维度,配置的Embedding模型必须支持定义的向量维度。默认为1024。
否
nullAsEmptyString
由于大模型做Embedding时不允许传入的数据为NULL,因此如果源表数据存在NULL,支持将其转为空字符串处理,避免向量化异常。默认
False
。否
appendFieldNameEnable
做向量化时,是否要将原始数据和字段名拼接在一起做向量化。开启时,还需要配置appendFieldNameDelimiter。默认
False
。否
appendFieldNameDelimiter
拼接字段名称时的拼接符,仅在开启appendFieldNameEnable时生效。
否
模拟运行。
您可以单击离线同步节点编辑页面顶部的模拟运行,然后依次单击开始采集、预览,查看向量化后的结果,确认配置是否正确。
更多高级配置。
单击节点配置页面右侧的高级配置,你可以按需设置任务并发数、同步速率、脏数据策略等参数。
三、调试运行
单击离线同步节点编辑页面右侧的调试配置,设置调试运行使用的资源组和脚本参数,然后单击顶部工具栏的运行,测试同步链路是否成功运行。
您可以前往Milvus侧,查看数据去向Collection中的数据是否符合预期。
四、调度配置与发布
单击离线同步任务右侧的调度配置,设置周期运行所需的调度配置参数后,单击顶部工具栏的发布,进入发布面板,根据页面提示完成发布。
附录:脚本模式格式说明
脚本模式的基本结构如下:
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "xxx",
"parameter": {
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "transformer1",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "transformer2",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
}
}
steps
里定义了每个处理的算子,至少需要一个Reader
,一个Writer
,中间可以有若干个Transformer
转换算子。假设您配置的并发度为2,那么一个Job将会有两条并行的数据处理流,每个Reader、Transformer、Writer都属于任务配置中的一个step
。
steps
定义了每个算子的类型和参数,数据同步及处理过程严格遵循每个step
在JSON配置里的顺序执行。
数据集成支持的各种数据源读、写通道详细参数配置,详见:支持的数据源及同步方案。