Best practices for LLM text embedding with FeatureStore

更新时间:
复制 MD 格式

In machine learning feature engineering, text fields cannot be used directly for model training. They must first be converted into embedding vectors using a large language model (LLM). For example, text embedding can convert unstructured text, such as product reviews and user comments, into fixed-dimensional vector representations. These vectors can then be used for similarity calculations, semantic retrieval, and candidate generation in recommendation systems. This document describes the end-to-end process of using FeatureStore for offline and real-time text embedding, model training, and deployment.

LLM text embedding models

The features described in this document support calling any LLM embedding model, whether hosted on Alibaba Cloud or self-hosted. In June 2024, Alibaba Cloud released the Qwen3 Embedding model series. These models are specifically designed for text representation, retrieval, and ranking tasks. They are trained on the Qwen3 foundation model and inherit its advantages in multilingual text understanding. The Qwen3 Embedding series demonstrates exceptional performance in text representation and ranking tasks across multiple benchmark tests.

The core function of text embedding is to convert unstructured text data into structured vector representations. For example, in an e-commerce recommendation scenario, a user review like "Good Quality Dog Food" can be converted into a 1024-dimensional vector. This vector can then be compared with the vectors of other product reviews to enable semantic matching for recommendations. This document uses the Qwen3 Embedding model series as an example to describe the complete process of text feature extraction, training, and deployment in FeatureStore.

Prerequisites

Prepare a data table

This document uses product review text as an example. In real-world e-commerce scenarios, user reviews for products contain rich information, such as product quality and user experience. This text data can serve as an important feature for training recommendation models. Copy the following SQL statement and execute it in DataWorks.

In this example, assume that the feature field review_text of the item table is a text field, and you need to perform real-time processing on the review_text field to extract embeddings. The schema and sample data of the corresponding MaxCompute table are as follows:

create table if not exists item_fea_review_text_v1(
    item_id bigint COMMENT 'item_id'
    ,review_text string COMMENT 'review_text'
)
COMMENT 'item_fea_review_text_v1'
PARTITIONED BY (
    ds string COMMENT 'business date'
)
LIFECYCLE 365
;

INSERT INTO TABLE item_fea_review_text_v1 PARTITION (ds='${bdp.system.bizdate}')  -- If the table is partitioned, data is inserted into the latest partition by default.
VALUES (901, 'Good Quality Dog Food,I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.');  -- Adjust the values based on the actual field types.

select *
from item_fea_review_text_v1 where ds = '${bdp.system.bizdate}';

Register resources in FeatureStore

Register LLM call information

Before you can call an LLM for text embedding, you must register its configuration as LLM call information in FeatureStore. After registration, FeatureStore automatically manages the model invocation and vector generation process. This section uses the registration of the Qwen text-embedding-v4 model as an example.

  1. Log on to the PAI console. In the left-side navigation pane, click Data Preparation.

  2. Select a workspace and click Enter FeatureStore.

  3. On the Feature Production tab, click Create LLM Call Information. If you have not yet activated the PAI Token Service, you will be prompted to do so. Configure the following parameters and leave the others at their default values.

    Parameter

    Description

    Name

    Enter a custom name.

    Model parameters

    For Model Type, select TEXT_EMBEDDING. For Model Name, select text-embedding-v4. The default values are Vector Dimension: 1024 and Concurrency: 30. Set Maximum Input Tokens to 8192 and BatchSize to 10 (the recommended maximum). You can adjust these values based on your workload.

    For parameter details, see Synchronous API details.

    The text-embedding-v4 model belongs to the Qwen3-Embedding series and produces 1024-dimensional vectors by default.

  4. Click OK.

Register a feature view

A feature view is a core concept in FeatureStore that defines the feature schema, data sources, and transformations. Before creating a feature view, you must create a FeatureStore project and create a feature entity (for example, item). The item feature entity represents the product dimension. All subsequent text embedding and vector generation are based on this entity.

  1. On the Feature View tab, click Create Feature View. Configure the following parameters and leave the others at their default values.

    Parameter

    Description

    View Name

    Enter a custom name. Example: item_fea_embedding_debug_v1.

    Type

    Select Real Time.

    Feature entity

    Select item.

    Feature Field

    On the Table tab, add the following two fields:

    • item_id: Set the type to INT64 and select the Primary Key checkbox.

    • review_text_embedding: Set the type to ARRAY<FLOAT>. Click Feature Production and configure the following parameters:

      • Transformation Type: Select LLMEmbedding.

      • LLM Call Information: Select the name of the registered LLM call information.

      • Feature Production Input: For Field Name, enter review_text. For Multimodal Type, select TEXT. The field name must match the raw field name from your data source. For offline scenarios, this is the input field in the MaxCompute table. For online scenarios, this is the input field from sources like Flink or an SDK.

  2. Click Submit.

  3. Click the name of the feature view you created to view its schema in JSON format.

Synchronize data

Real-time sync

Real-time sync is suitable for scenarios where features must be continuously updated. For example, when new products are listed or user reviews are updated, the latest text data must be promptly converted into embeddings and written to FeatureDB. The core process for real-time sync is as follows:

  1. Create a DataHub topic.

    1. Log on to the DataHub console and create a project.

    2. Click the name of the project you created to go to the project details page.

    3. Click Create Topic. Configure the following parameters and leave the others at their default values.

      Parameter

      Description

      Name

      Enter a custom name.

      Schema details

      Add the following two fields:

      • item_id: Set the type to BIGINT and clear the Allow Null checkbox.

      • review_text: Set the type to STRING and select the Allow Null checkbox.

    4. Click Create.

    5. Click the topic you created to go to the topic details page.

    6. On the Subscription List tab, click Subscribe, fill in the required information, then click Create.

  2. Use Flink SQL to write to the FeatureStore Connector.

    A Flink SQL job has three parts. First, create a temporary table linked to DataHub to receive real-time data. Second, create another temporary table linked to the FeatureStore feature view to write the processed features. Finally, run an INSERT statement to sync data from the source to the destination table. After this Flink job is deployed and started, it continuously reads real-time data from DataHub and writes it to the corresponding feature view in FeatureStore. FeatureStore then automatically calls the registered LLM to extract embeddings, storing the results in the review_text_embedding field. The following is a Flink SQL example:

    CREATE TEMPORARY TABLE item_fea_embedding_debug_v1_dh
    (
        item_id      bigint
        ,review_text string
    )
    WITH (
        'connector' = 'datahub'
        ,'subId' = 'xxxxxxxxxx'
        ,'endPoint' = 'http://dh-cn-xxxxxx.aliyuncs.com' -- Replace with your actual DataHub endpoint address.
        ,'project' = 'fs_test' -- Replace with your actual DataHub project name.
        ,'topic' = 'feature_store_llm_embedding_test_v1' -- Replace with your actual DataHub topic name.
        ,'accessId' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
        ,'accessKey' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
    )
    ;
    
    CREATE TEMPORARY TABLE item_fea_embedding_debug_v1
    (
        item_id          bigint
        ,review_text         string
    )
    WITH (
        'connector' = 'featurestore'
        ,'region_id' = 'cn-beijing' -- Replace with the actual region of your FeatureStore.
        ,'project' = 'fs_demo_featuredb' -- Replace with your actual FeatureStore project name.
        ,'feature_view' = 'item_fea_embedding_debug_v1' -- Replace with your actual FeatureStore feature view name.
        ,'username' = '${secret_values.FEATUREDB_USERNAME}'  
        ,'password' = '${secret_values.FEATUREDB_PASSWORD}'  
        ,'aliyun_access_id' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_ID}'
        ,'aliyun_access_key' = '${secret_values.ALIBABA_CLOUD_ACCESS_KEY_SECRET}'
    )
    ;
    
    INSERT INTO item_fea_embedding_debug_v1
    SELECT
        item_id
        ,review_text
    FROM item_fea_embedding_debug_v1_dh
    ;

    To protect your data, before you deploy and start this SQL job, you must create and assign values to the following variables in Variable Management:

    ALIBABA_CLOUD_ACCESS_KEY_ID, ALIBABA_CLOUD_ACCESS_KEY_SECRET, FEATUREDB_USERNAME, and FEATUREDB_PASSWORD.

  3. Read features from the online data source.

    You can use the FeatureStore SDK to read features from FeatureDB to verify that the online feature writing process is running correctly.

Offline sync

Offline sync is suitable for batch feature updates. For example, during the initial launch, you may need to batch-convert historical product review text into embedding vectors. Alternatively, during daily routine updates, you might sync the embeddings of new review text to FeatureDB. Offline sync can be performed in two ways: full sync and incremental sync.

  1. Full sync: For the initial launch, you can sync the entire offline table to FeatureDB after extracting LLM embeddings through feature production.

    Copy the following code and execute it in a PyODPS 3 node in DataWorks. To run the code, see Method 1: Use a new exclusive resource group and a built-in image (Recommended) to configure and run the node.

    from feature_store_py.fs_client import FeatureStoreClient
    import datetime
    from feature_store_py.fs_datasource import MaxComputeDataSource
    import sys
    from odps.accounts import StsAccount
    
    cur_day = args['dt']
    print('cur_day = ', cur_day)
    
    access_key_id = o.account.access_id
    access_key_secret = o.account.secret_access_key
    sts_token = None
    endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com' # Replace with your actual endpoint address.
    if isinstance(o.account, StsAccount):
        sts_token = o.account.sts_token
    fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
    cur_project_name = 'fs_demo_featuredb' # Replace with your actual FeatureStore project name.
    project = fs.get_project(cur_project_name)
    
    feature_view_name = 'item_fea_embedding_debug_v1' # Replace with your actual FeatureStore feature view name.
    batch_feature_view = project.get_feature_view(feature_view_name)
    task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_v1'})
    task.wait()
  2. Incremental sync: After the initial full sync, you can sync only the incremental data to avoid reprocessing the review text of existing products. Incremental sync works by first calculating the set of new item_ids and then sync the text embeddings of these incremental products to FeatureDB.

    1. Calculate the set of new item_ids. Identify the newly added product data for the current day by using an SQL query:

      CREATE TABLE IF NOT EXISTS item_fea_review_text_new_tmp_v1 (
        item_id BIGINT COMMENT 'Item ID',
        review_text STRING COMMENT 'English description'
      )
      PARTITIONED BY (
          ds string COMMENT 'business date'
      )
      LIFECYCLE 365
      ;
      
      -- Insert new data
      INSERT INTO TABLE item_fea_review_text_new_tmp_v1 PARTITION (ds='${bdp.system.bizdate}')
      SELECT 
          t1.item_id,
          t1.review_text
      FROM 
          item_fea_review_text_v1 t1
      WHERE 
          t1.ds = '${bdp.system.bizdate}'
          AND NOT EXISTS (
            SELECT 1 
            FROM item_fea_review_text_v1 t2 
            WHERE t2.ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'), -1, 'dd'),'yyyymmdd')
            AND t1.item_id = t2.item_id
          );
    2. Sync the new set of item_ids to FeatureDB through feature production.

      Copy the following code and execute it in a PyODPS 3 node in DataWorks.

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      from odps.accounts import StsAccount
      
      cur_day = args['dt'] # The value of dt is the actual partition value in the table.
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      sts_token = None
      endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
      if isinstance(o.account, StsAccount):
          sts_token = o.account.sts_token
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
      cur_project_name = 'fs_demo_featuredb'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'item_fea_embedding_debug_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_new_tmp_v1'})
      task.wait()

Sync pre-extracted embeddings directly

In some scenarios, you may have already extracted text embeddings in another system and do not want to use FeatureStore to call the model for extraction again. In this case, you can directly synchronize the existing embedding results to the online data source. This method saves model invocation costs and accelerates feature deployment. When you directly synchronize pre-computed embeddings, the table must contain the review_text_embedding field.

Assume the table with pre-extracted embeddings is as follows:

create table if not exists item_fea_review_text_embedding_debug_v1(
    item_id bigint COMMENT 'item_id'
    ,review_text_embedding array<float> COMMENT 'review_text_embedding'
)
COMMENT 'item_fea_review_text_embedding_debug_v1'
PARTITIONED BY (
    ds string COMMENT 'business date'
)
LIFECYCLE 365
;

INSERT INTO TABLE item_fea_review_text_embedding_debug_v1 PARTITION (ds='20250509')  -- If the table is partitioned, data is inserted into the latest partition by default.
VALUES (902, cast(array(1,2,3,4,5,6,7.0,8.1,9.0) as array<float>));  -- Adjust the values based on the actual field types.

select *
from item_fea_review_text_embedding_debug_v1 where ds = '20250509';

Copy the following code and execute it in a PyODPS 3 node in DataWorks.

from feature_store_py.fs_client import FeatureStoreClient
import datetime
from feature_store_py.fs_datasource import MaxComputeDataSource
import sys
from odps.accounts import StsAccount

cur_day = args['dt']
print('cur_day = ', cur_day)

access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
sts_token = None
endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
if isinstance(o.account, StsAccount):
    sts_token = o.account.sts_token
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
cur_project_name = 'fs_demo_featuredb'
project = fs.get_project(cur_project_name)

feature_view_name = 'item_fea_embedding_debug_v1'
batch_feature_view = project.get_feature_view(feature_view_name)
task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_embedding_debug_v1', 'transform': False})
task.wait()
task.print_summary()

Export and train

Create model features

A model feature set defines which features are used for model training. For more information, see Configure a FeatureStore project.

Export samples

Export a training dataset for model training. For more information, see Export a training set and train a model.

Configure Feature Generator and train

Use the TorchEasyRec create_fg_json script to generate fg.json based on the TorchEasyRec config. This is used for feature processing during online inference.

For detailed instructions, see Configure Feature Generator and train. For information on configuring embedding-type features in TorchEasyRec, see TorchEasyRec features.

Deploy

Deploy an EAS service

Deploy the trained model as an Elastic Algorithm Service (EAS) service for online inference. For more information, see Create and deploy an EAS model service.

Configure PAI-Rec engine

Configure the PAI-Rec engine to fetch real-time features from FeatureDB and call the EAS service for recommendation inference. For more information, see Configure PAI-REC.