离线同步任务数据向量化处理

阿里云DataWorks数据集成推出Embedding向量化功能,支持将分散在OSS、MaxCompute、HDFS等异构来源中的数据抽取并转化为向量,写入Milvus、Elasticsearch、Opensearch等向量库,能够大幅简化ETL流程,高效实现知识向量化,助力RAGAI场景落地。

为什么使用 Embedding

随着大语言模型(LLM)技术的持续演进,对企业而言,将私有知识深度整合至模型体系,是实现其在实际业务场景中有效落地并创造价值的关键。检索增强生成(RAG)由此成为重要的技术路径之一,通过将数据编码为向量表示,并结合向量数据库进行高效检索,为大模型提供精准、权威且动态更新的领域知识支持。

您的业务数据可能散落在OSS、MaxCompute、HDFS、MySQL、Oracle 或消息队列等众多异构数据源中,需要将这些异构数据源的数据做好Embedding向量化,然后将向量化的结果再写入各种异构的向量化数据库中(比如 Milvus、OpenSearch、 ElasticSearch等)。此过程不仅需要编写繁琐的ETL脚本,还需要适配各种源端数据源类型,数据将历经抽取、转换、向量化(Embedding)及写入等多个阶段,流程链路长且耦合度高,导致模型迭代周期显著延长。

功能介绍

重要

该功能正在灰度邀测中,如有需要,请提交工单联系技术支持人员。

DataWorks数据集成支持Embedding向量化功能,在同一条数据通道内一站式完成抽取、向量化与写入向量数据库,实现端到端的自动化处理。该能力显著降低开发复杂度,缩短知识更新延迟,助力RAG、智能客服、搜索推荐等场景实现高效的知识接入。

数据集成离线同步数据Embedding向量化同步能力,支持两种配置模式:

  • 向导模式配置:通过可视化界面,帮助您快速配置离线Embedding同步能力。

  • 脚本模式配置:脚本模式支持了更复杂高级配置功能,可以通过脚本模式配置多种同步链路,满足个性化配置。

使用限制

  • 仅启用了新版数据开发的工作空间可用。

  • 仅支持使用Serverless资源组。

前提条件

测试数据准备

本教程演示的表数据来自公开数据集(电商网站商品评论情感预测数据集),将商品的用户评论做向量化处理,然后同步到Milvus中进行后续的相似度检索。

  • 数据来源MaxCompute侧:创建测试表并插入测试数据。

    测试数据

    --创建测试表
    CREATE TABLE IF NOT EXISTS test_tb (
        sentence STRING,
        label STRING,
        dataset STRING
    )
    PARTITIONED BY (
        split STRING
    )
    LIFECYCLE 30;
    
    --插入测试数据
    INSERT INTO test_tb PARTITION (split = 'dev')
    SELECT * FROM VALUES
      ('擦玻璃很好、就是太小了', '1', 'jd'),
      ('店家太不负责任了,衣服质量太差劲了,和图片上的不一样', '0', 'jd'),
      ('送国际友人挺好的,不错不错!', '1', 'jd'),
      ('很好,装好一定很漂亮', '1', 'jd'),
      ('东西给你退回去了,你要黑我钱!!!', '0', 'jd'),
      ('送货很快,书是正品,买书一直京东是首选!', '1', 'jd'),
      ('口感相当的好 都想买第二次了', '1', 'jd'),
      ('硅胶味道太重,样子与图片差距太大', '0', 'jd'),
      ('很伤心,买了放到三星n4尽然不能用,客服各种推', '0', 'jd'),
      ('质量不错,大小合适,应当是正品!但是我买的是黑灰,发来的却是纯黑,懒得换了,给个差评,希望以后改进!', '0', 'jd')
    AS t (sentence, label, dataset);
    
    -- 查询
    SELECT * FROM test_tb WHERE split = 'dev';
  • 数据去向Milvus侧:创建目标表用于接收向量化后的数据。表结构如下:

    目标表特性为自动ID表。

    字段名

    类型

    描述

    id

    Int64

    主键,自动递增。

    sentence

    VarChar(32)

    存储原始文本。

    sentence_e

    FloatVector(128)

    向量字段,用于相似性搜索,使用COSINE度量。

向导模式配置

本教程以源端MaxCompute(ODPS)将数据读取并做Embedding向量化后同步到Milvus为例,介绍如何在数据集成中使用向导模式配置离线同步任务。

一、新建离线同步节点

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 在项目目录单击image > 新建节点 > 数据集成 > 离线同步,配置数据来源与去向(本教程来源为MaxCompute、去向为Milvus)、节点名称,单击确认

二、配置离线同步任务

  1. 配置基本信息。

    • 数据源:选择数据来源和数据去向对应的数据源。

    • 运行资源:执行离线同步任务所使用的资源组,选择当前工作空间已绑定且与数据源连通的资源组。

    如果无可用的数据源和运行资源,请确保已完成前提条件
  2. 配置数据来源

    以下为本教程MaxCompute数据源需配置的关键参数,如果使用其他数据来源,配置可能存在差异,请根据实际情况配置。

    image

    参数

    说明

    Tunnel资源组

    Tunnel Quota,默认选择公共传输资源,即MaxCompute的免费quota。MaxCompute的数据传输资源选择,具体请购买与使用独享数据传输服务资源组

    重要

    如果独享Tunnel Quota因欠费或到期不可用,任务在运行中将会自动切换为公共传输资源

    选择待同步的数据来源表。

    如果无可选的来源表,请确保已完成测试数据准备

    过滤方式

    支持分区过滤数据过滤

    • 如果来源表是分区表,您可以按分区选择同步数据的范围。

    • 如果来源表是非分区表,您可以设置WHERE过滤语句,选择同步数据的范围。

    您可以单击数据预览,查看配置是否正确。

    image

  3. 配置数据处理

    1. 打开数据处理开关,然后在数据处理列表中单击添加节点 > 数据向量化,添加数据向量化处理节点。

      image

    2. 配置数据向量化节点。image关键参数解释如下:

      说明
      • 数据向量化节点性能取决于配置的模型性能,阿里云百炼平台提供的QWen模型有QPS限制,阿里云PAI模型市场需要您自己在PAI EAS上部署模型,其性能表现取决于部署模型使用的资源规格。

      • 在参数确定的情况下,Embedding模型生成的向量结果是确定性的。因此DataWorks数据集成在同步的过程中针对相同的向量化原始数据进行了LFU Cache(Least Frequently Used)优化,避免相同的数据重复调用Embedding模型,以提升处理性能,降低Embedding成本。

      参数

      描述

      模型提供商

      大模型提供商,目前支持如下模型提供商:

      • 阿里云百炼平台,支持QWen模型。

      • 阿里云PAI模型市场,支持BGE-M3模型。

      模型名称

      Embedding模型名称,按需选择。

      模型 API Key

      访问模型的API KEY,请前往模型提供商获取。

      • 阿里云百炼平台:获取百炼API Key

      • 阿里云PAI模型市场:前往部署的EAS任务,进入在线调试,获取Headers Authorization的参数值,将其作为API KEY填写到此处。

      模型 Endpoint

      模型提供商选择阿里云PAI模型市场时,需要配置访问模型的访问点(Endpoint API地址)。

      批处理大小

      向量化批处理大小,取决于Embedding模型是否支持批处理,批处理有助于提升Embedding性能,降低Embedding成本。默认为10。

      选择需要向量化的字段

      定义需要将哪些列进行向量化,指定向量化后输出的字段名,数据集成支持对来源单个字段或者多个字段拼接组合做向量化。

      向量化结果字段

      源表来源字段向量化后定义的向量化字段名称。

      向量维度

      输出的向量维度,配置的Embedding模型必须支持定义的向量维度。默认为1024。

      NULL值转为空字符串

      由于大模型做向量化时不允许传入的数据为NULL,因此如果源表数据存在NULL,支持将其转为空字符串处理,避免向量化异常。默认不勾选。

      是否拼接字段名称

      做向量化时,是否需要拼接字段名称到文本中一起做向量化。选中时,还需要配置字段名拼接符。默认不勾选。

      是否跳过空值字段

      多个字段拼接进行向量化时,是否要跳过空值字段。默认选中,跳过空值字段。

    3. 数据输出预览。

      您可以单击数据向量化节点配置区域右上角的数据输出预览,然后预览向量化后的结果,确认配置是否正确。

      您也可以单击离线同步编辑页面顶部的模拟运行,预览向量化后的结果。

      image

  4. 配置数据去向

    以下为本教程Milvus数据源需配置的关键参数,如果使用其他数据来源,配置可能存在差异,请根据实际情况配置。

    image

    参数

    说明

    集合

    用于接收向量数据的Collection。

    分区键

    可选,如果Collection配置了分区,您可以为接收的向量数据指定分区。

    写入模式

    • upsert

      • 当表未设置自动ID时:根据主键更新 Collection 中的某个 Entity。

      • 当表设置自动ID时:将 Entity 中的主键替换为自动生成的主键,并插入数据。

    • insert:多用于自动ID表插入数据,milvus自动生成主键。

      非自动ID表使用insert会导致数据重复。
  5. 配置去向字段映射

    当配置完数据来源、数据处理和数据去向后,离线同步将自动生成字段映射关系,由于目标端为无固定结构数据源,默认按同行映射执行,因此,需要单击来源字段目标字段后的编辑,调整字段映射顺序,或者删减不需要的字段,以确保映射关系正确。

    例如,本教程手动删除不需要映射的字段,调整后的映射关系如下。

    image

  6. 更多高级配置

    单击节点配置页面右侧的高级配置,你可以按需设置任务并发数、同步速率、脏数据策略等参数。

三、调试运行

  1. 单击离线同步节点编辑页面右侧的调试配置,设置调试运行使用的资源组脚本参数,然后单击顶部工具栏的运行,测试同步链路是否成功运行。

  2. 您可以前往Milvus侧,查看数据去向Collection中的数据是否符合预期。

    image

四、调度配置与发布

单击离线同步任务右侧的调度配置,设置周期运行所需的调度配置参数后,单击顶部工具栏的发布,进入发布面板,根据页面提示完成发布

脚本模式配置

本教程以源端MaxCompute(ODPS)将数据读取并做Embedding向量化后同步到Milvus为例,介绍如何在数据集成中使用脚本模式配置离线同步任务。

一、新建离线同步节点

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 在项目目录单击image > 新建节点 > 数据集成 > 离线同步,配置数据来源与去向(本教程来源为MaxCompute、去向为Milvus)、节点名称,单击确认

二、配置同步脚本

  1. 单击离线同步节点顶部工具栏的image,切换为脚本编辑模式。

  2. 配置MaxComputeMilvus的离线同步任务。

    请根据附录:脚本模式格式说明,配置离线同步任务的JSON脚本。本示例配置的脚本如下:

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "odps",
                "parameter": {
                    "partition": [
                        "split=dev"
                    ],
                    "datasource": "MaxCompute_Source",
                    "successOnNoPartition": true,
                    "tunnelQuota": "default",
                    "column": [
                        "sentence"
                    ],
                    "enableWhere": false,
                    "table": "test_tb"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "category": "flatmap",
                "stepType": "embedding-transformer",
                "parameter": {
                    "modelProvider": "bailian",
                    "modelName": "text-embedding-v4",
                    "embeddingColumns": {
                        "sourceColumnNames": [
                            "sentence"
                        ],
                        "embeddingColumnName": "sentence_e"
                    },
                    "apiKey": "sk-****",
                    "dimension": 128,
                    "nullAsEmptyString": true
                },
                "displayName": "sentence2emb",
                "description": ""
            },
            {
                "stepType": "milvus",
                "parameter": {
                    "schemaCreateMode": "ignore",
                    "enableDynamicSchema": true,
                    "datasource": "Milvus_Source",
                    "column": [
                        {
                            "name": "sentence",
                            "type": "VarChar",
                            "elementType": "None",
                            "maxLength": "32"
                        },
                        {
                            "name": "sentence_e",
                            "type": "FloatVector",
                            "dimension": "128",
                            "elementType": "None",
                            "maxLength": "65535"
                        }
                    ],
                    "writeMode": "insert",
                    "collection": "Milvus_Collection",
                    "batchSize": 1024,
                    "columnMapping": [
                        {
                            "sourceColName": "sentence",
                            "dstColName": "sentence"
                        },
                        {
                            "sourceColName": "sentence_e",
                            "dstColName": "sentence_e"
                        }
                    ]
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "errorLimit": {
                "record": "0"
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
    • ReaderWriter部分的参数解释,请参见MaxCompute数据源Milvus数据源

    • 如果您使用其他类型数据来源和去向,请参见数据源列表

    • 数据向量化处理节点脚本部分各参数说明如下:

      参数

      描述

      是否必填

      modelProvider

      大模型提供商,目前支持如下模型提供商:

      • bailian:阿里云百炼平台,支持QWen模型。

      • paiModelGallery:阿里云PAI模型市场,支持BGE-M3模型。

      modelName

      Embedding模型名称。

      • modelProviderbailian时,可选择text-embedding-v4text-embedding-v3

      • modelProviderpaiModelGallery时,可选择bge-m3

      apiKey

      访问模型的API KEY,请前往模型提供商获取。

      endpoint

      modelProviderpaiModelGallery时,需要配置访问模型的访问点(Endpoint API地址)。

      batchSize

      向量化批处理大小,取决于Embedding模型是否支持批处理,批处理有助于提升Embedding性能,降低Embedding成本。默认为10。

      embeddingColumns

      定义需要将哪些列进行向量化,指定向量化后输出的字段名,数据集成支持对来源单个字段或者多个字段拼接组合做向量化。

      例如:

      {
        "embeddingColumns": {
          "sourceColumnNames": [
            "col1",
            "col2"
          ],
          "embeddingColumnName": "my_vector"
        }
      }

      appendDelimiter

      将多个字段值拼接为一个文本进行向量化时的拼接符。默认值\n

      skipEmptyValue

      多个字段拼接进行向量化时,是否要跳过空值字段。默认False。

      dimension

      输出的向量维度,配置的Embedding模型必须支持定义的向量维度。默认为1024。

      nullAsEmptyString

      由于大模型做Embedding时不允许传入的数据为NULL,因此如果源表数据存在NULL,支持将其转为空字符串处理,避免向量化异常。默认False

      appendFieldNameEnable

      做向量化时,是否要将原始数据和字段名拼接在一起做向量化。开启时,还需要配置appendFieldNameDelimiter。默认False

      appendFieldNameDelimiter

      拼接字段名称时的拼接符,仅在开启appendFieldNameEnable时生效。

  3. 模拟运行。

    您可以单击离线同步节点编辑页面顶部的模拟运行,然后依次单击开始采集预览,查看向量化后的结果,确认配置是否正确。image

  4. 更多高级配置

    单击节点配置页面右侧的高级配置,你可以按需设置任务并发数、同步速率、脏数据策略等参数。

三、调试运行

  1. 单击离线同步节点编辑页面右侧的调试配置,设置调试运行使用的资源组脚本参数,然后单击顶部工具栏的运行,测试同步链路是否成功运行。

  2. 您可以前往Milvus侧,查看数据去向Collection中的数据是否符合预期。

    image

四、调度配置与发布

单击离线同步任务右侧的调度配置,设置周期运行所需的调度配置参数后,单击顶部工具栏的发布,进入发布面板,根据页面提示完成发布

附录:脚本模式格式说明

脚本模式的基本结构如下:

{
    "type": "job", 
    "version": "2.0", 
    "steps": [
        {
            "stepType": "xxx",
            "parameter": {
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "xxx",
            "parameter": {
            },
            "name": "transformer1",
            "category": "map/flatmap"
        },
        {
            "stepType": "xxx",
            "parameter": {
            },
            "name": "transformer2",
            "category": "map/flatmap"
        },
        {
            "stepType": "xxx",
            "parameter": {
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
    }
}

steps里定义了每个处理的算子,至少需要一个Reader,一个Writer,中间可以有若干个Transformer转换算子。假设您配置的并发度为2,那么一个Job将会有两条并行的数据处理流,每个Reader、Transformer、Writer都属于任务配置中的一个step

image

steps定义了每个算子的类型和参数,数据同步及处理过程严格遵循每个stepJSON配置里的顺序执行。

数据集成支持的各种数据源读、写通道详细参数配置,详见:支持的数据源及同步方案