使用外部数据源构建Data-Agent

PolarDB for AI支持将外部数据源接入Data-Agent。无论您的业务数据存储在外部数据库(如PostgreSQL、Hive、Elasticsearch)中,还是希望在不实际创建表的前提下,Data-Agent均能够帮助您快速实现基于大语言模型的自然语言查询。PolarDB for AI通过预先定义数据结构(Schema),为大模型构建一个知识库,使其能将自然语言问题(例如“查询订单最多的客户”)转换为对应数据源的查询语句(SQL/DSL),从而实现对外部数据的分析与访问。

工作原理

PolarDB for AI将外部数据源的元数据(如表结构、列注释、示例值等)存储在特定表中。然后,通过内置的_polar4ai_text2vec模型将文本元数据转换为向量,并存入一个可供检索的索引表schema_index中。

当发起自然语言查询时,自然语言到SQL语言转义(基于大语言模型的NL2SQL)功能执行以下操作:

  1. 将自然语言问题转换为向量。

  2. schema_index表中进行向量相似度搜索,找到与问题相关的表和列。

  3. 结合检索到的元数据信息,生成可执行的SQLDSL查询语句。

构建模式选型

PolarDB for AI提供两种模式接入外部元数据,可根据业务场景和数据源特点选择合适的方案。

特性

schema_info模式(手动精细控制)

schema_meta模式(DDL自动解析)

适用场景

需要精细控制元数据(如补充注释、提供特定示例值),以提升NL2SQL转换准确率。

适用于快速接入已有的大量数据表,且源数据库支持导出标准的DDL或索引定义。

支持数据源

理论上支持任何数据源,因为所有元数据均为手动录入。

支持Hive、MySQL、PostgreSQL等关系型数据库,以及Elasticsearch。

优点

元数据信息可控性强,有助于提升复杂查询的准确性。

自动化程度高,接入快,维护成本低,适合大规模批量导入。

缺点

手动配置工作量大,流程相对繁琐。

依赖DDL解析的准确性,可能丢失DDL之外的元信息(如业务逻辑、特定示例值)。

准备工作

开始前,请确保已完成以下准备:

  • 增加AI节点,并设置AI节点的连接数据库账号:开启PolarDB for AI功能

    说明
    • 若您在购买集群时已添加AI节点,则可以直接为AI节点设置连接数据库的账号。

    • AI节点的连接数据库账号需具有读写权限,以确保能够顺利读取和写入目标数据库。

  • 使用集群地址连接PolarDB集群:登录PolarDB for AI

    重要
    • 使用命令行连接集群时,需增加-c选项。

    • 在使用DMS体验和使用PolarDB for AI功能时,DMS默认使用PolarDB集群的主地址进行连接,无法将SQL语句路由至AI节点。因此,您需要手动将连接地址修改为集群地址

通过手动提供元数据构建Data-Agent(schema_info模式)

此模式通过schema_info表管理外部数据库的元数据,适用于需要精细控制元数据以提升准确率的场景。

步骤一:创建元数据表schema_info

此表用于记录外部数据源的表名、列名、注释、数据类型等信息。

CREATE TABLE schema_info (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键',
    table_name VARCHAR(255) NOT NULL COMMENT '表名',
    table_comment TEXT COMMENT '表注释',
    column_name VARCHAR(255) NOT NULL COMMENT '列名',
    column_comment TEXT COMMENT '列注释',
    data_type VARCHAR(255) COMMENT '列数据类型(可缺省)',
    sample_values TEXT COMMENT '示例值',
    is_primary INT COMMENT '是否主键(1/0)',
    is_foreign INT COMMENT '是否外键(1/0)',
    ext TEXT COMMENT '外键信息',
    db_type VARCHAR(128) COMMENT '关系型数据库语言类型(如 MySQL, PostgreSQL等)'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库元数据信息表';

步骤二:准备并插入元数据

按照以下格式要求,将元数据插入schema_info表。

  • db_type:指明源数据库的类型,如MySQLPostgreSQL等。同一逻辑库下的所有表,其db_type必须一致。

  • sample_values:提供示例值,多个值之间使用英文逗号(,)分隔。为保证效果,字段总长度不应超过100个字符。

  • is_foreign:是否为外键。值为1时,表示该列为外键。

  • ext:当is_foreign1时,此列需填写外键关联信息。格式为:<database_name>.<table_name>.<column_name>

示例: 为customersorders两张表插入元数据。

-- 为 customers 表插入元数据
INSERT INTO schema_info (table_name, table_comment, column_name, column_comment, data_type, sample_values, is_primary, is_foreign, ext, db_type) 
VALUES
('customers', '客户信息表', 'id', '客户唯一标识', 'INT', '1,2,3', 1, 0, NULL, 'PostgreSQL'),
('customers', '客户信息表', 'name', '客户姓名', 'VARCHAR(100)', '张三,李四,王五', 0, 0, NULL, 'PostgreSQL'),
('customers', '客户信息表', 'email', '客户邮箱', 'VARCHAR(100) UNIQUE', 'zhangsan@example.com', 0, 0, NULL, 'PostgreSQL');

-- 为 orders 表插入元数据
INSERT INTO schema_info (table_name, table_comment, column_name, column_comment, data_type, sample_values, is_primary, is_foreign, ext, db_type) 
VALUES
('orders', '订单信息表', 'order_id', '订单唯一标识', 'INT', '1001,1005,1003', 1, 0, NULL, 'PostgreSQL'),
('orders', '订单信息表', 'order_date', '订单日期', 'DATE', '2023-01-01,2023-05-06', 0, 0, NULL, 'PostgreSQL'),
('orders', '订单信息表', 'customer_id', '关联客户ID', 'INT', '1,2,3', 0, 1, 'dbname.customers.id', 'PostgreSQL'),
('orders', '订单信息表', 'total_amount', '订单总金额', 'DECIMAL(10,2)', '99.99,101.81', 0, 0, NULL, 'PostgreSQL');

步骤三:构建检索索引

  1. 此步骤创建schema_index表,并调用AI模型将schema_info中的元数据处理成向量,填充到索引表中。该任务为异步执行。

    -- 创建用于存储向量索引的表
    /*polar4ai*/CREATE TABLE schema_index(
      id integer, 
      table_name varchar, 
      table_comment text_ik_max_word, 
      table_ddl text_ik_max_word, 
      column_names text_ik_max_word, 
      column_comments text_ik_max_word, 
      sample_values text_ik_max_word, 
      vecs vector_768,
      ext text_ik_max_word, 
      database_service text_ik_max_word, 
      PRIMARY key (id)
    );
    
    -- 启动异步任务,从 schema_info 构建索引
    /*polar4ai*/SELECT * FROM PREDICT (
    MODEL _polar4ai_text2vec, SELECT ''
    ) WITH (mode='async', resource='schema_info', to_sample = 1) 
    INTO schema_index;
    说明

    to_sample = 1:表示系统将从schema_info.sample_values列中提取示例值并进行向量化,以增强基于值的查询理解能力。设置为0则不使用。

  2. 执行成功后,命令会返回一个任务ID(例如b3b8fd1e-b886-11f0-9f89-97664bebacb7)。记录此ID,用于后续查询任务状态。

步骤四:验证任务状态

使用上一步获取的task_id查询索引构建任务的进度和结果。当状态变为finish时,表示索引构建完成。

/*polar4ai*/SHOW TASK `此处替换为上一步获取的task_id`;

步骤五:使用Data-Agent进行查询

索引构建完成后,即可通过自然语言到SQL语言转义(基于大语言模型的NL2SQL)功能,使用自然语言进行数据查询。

/*polar4ai*/SELECT * FROM PREDICT (MODEL _polar4ai_nl2sql, select '最多订单的10位客户') WITH (basic_index_name='schema_index');

通过自动解析DDL构建Data-Agent(schema_meta模式)

此模式通过解析DDLElasticsearch的索引定义,自动提取元数据,适用于批量接入数据源的场景。在该模式下,您可以通过输入建表语句来替代数据库中真实存在的表格,并借助AI智能解析构建Data-Agent系统中的检索索引表。

步骤一:创建DDL存储表schema_meta

此表用于存放从外部数据源导出的建表语句(DDL)或索引定义。

CREATE TABLE IF NOT EXISTS schema_meta (
    db_name VARCHAR(255) COMMENT '数据库名',
    create_table_statement TEXT COMMENT '建表或创建索引的语句'
) COMMENT='存放外部数据库的DDL或索引定义';

步骤二:准备并导入DDL/索引定义

将外部数据源的DDL或索引定义语句插入schema_meta表。该模式下用户可通过输入建表语句以代替库中真实存在的表格,并通过AI智能解析构建Data Agent系统中的检索索引表。

  • 对于关系型数据库(如Hive、MySQL、PostgreSQL等),建表语句需以CREATE开头。

  • 对于Elasticsearch,创建索引的语句需以PUT开头。

示例

-- 关系型数据库 DDL 示例
INSERT INTO schema_meta (db_name, create_table_statement) VALUES 
('my_hive_db', "CREATE TABLE dwd.dwd_col_df_court_case_detail (
    case_id BIGINT COMMENT '案件ID',
    created_date STRING COMMENT '创建时间(yyyy-mm-dd)'
)
COMMENT '案件订单表'
PARTITIONED BY (dt STRING)
STORED AS ORC;");

-- Elasticsearch 索引定义示例
INSERT INTO schema_meta (db_name, create_table_statement) VALUES
('my_es_cluster', 'PUT /product_info
{
  "settings": {"number_of_shards": 5, "number_of_replicas": 1},
  "mappings" : {
    "properties": {
      "productName": {"type": "text", "analyzer": "ik_smart"},
      "describe": {"type": "text", "analyzer": "ik_smart"}
    }
  }
}');
(可选)使用脚本批量导入

如需从外部数据源批量导入大量表的DDL,请单击查看并参考以下Python脚本。

从关系型数据库批量导入DDLPython脚本示例

此处以selectDB为例,为您展示如何批量导入至PolarDB for AIschema_meta表中。

  1. 安装依赖库:pip install pymysql

  2. 将以下代码保存在您的业务环境中。

    selectdb_data_transfer.py

    from mysql_connector import MysqlConnector
    
    import json
    
    def parse_ddl_from_json(file_name: str) -> list[str]:
        ddl_statements = []
        with open(file_name, 'r', encoding='utf-8') as f:
            # 使用 json.load() 来解析 JSON 文件内容为一个 Python 对象
            data = json.load(f)
    
            # 遍历列表中的每一个字典
            for table_info in data:
                # 从字典中获取 'Create Table' 键对应的值,即 DDL 语句
                create_table_sql = table_info.get('Create Table')
                if create_table_sql:
                    ddl_statements.append(create_table_sql)
    
        return ddl_statements
    
    def insert_into_schema_meta(db_name, ddls: list[str], connector: MysqlConnector):
        """
        通过connector连接polardb
        """
        insert_sql = """insert into schema_meta (db_name, create_table_statement) values (%s, %s)"""
        for i, ddl in enumerate(ddls):
            rows = connector.execute_update(insert_sql, (db_name, ddl))
            if rows == 0:
                print('insert failed: ', i)
                break
    
    def show_index(connector: MysqlConnector, db_name):
        table_names = connector.execute_query(
            f"select table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = '{db_name}'")
        table_ddls = []
        for result in table_names:
            # 请根据关系性数据库类型,修改返回的参数值。例如,PolarDB应为TABLE_NAME。
            table_name = result['table_name']
            table_ddl = connector.execute_query(f"show create table {table_name}")[0]
            table_ddls.append(table_ddl)
        json.dump(table_ddls, open('table_ddls.json', 'w'), indent=4)
    
    
    if __name__ == '__main__':
        selectdb_host = "<SelectDB连接地址>"
        selectdb_user = "<SelectDB用户名>"
        selectdb_password = "<SelectDB密码>"
        selectdb_database = "<SelectDB数据库>"
        selectdb_port = <SelectDB端口>
    
        polar_host = "<PolarDB连接地址>"
        polar_user = "<PolarDB用户名>"
        polar_password = "<PolarDB密码>"
        polar_database = "<PolarDB数据库>"
        polar_port = <PolarDB端口>
    
        # # 获取所有表的DDL
        # selectdb_connector = MysqlConnector(selectdb_host, selectdb_port, selectdb_user, selectdb_password, selectdb_database)
        # show_index(selectdb_connector, selectdb_database)
        
        # # 解析.json, 获得DDL语句
        # results = parse_ddl_from_json('table_ddls.json')
        # 
        # # 连接数据库
        # polar_mysql_connector = MysqlConnector(polar_host, polar_port, polar_user, polar_password, polar_database)
        # # 批量导入至PolarDB中
        # insert_into_schema_meta(polar_database, results, polar_mysql_connector)

    mysql_connector.py

    import pymysql
    from pymysql import Error
    from pymysql.cursors import DictCursor
    
    class MysqlConnector:
        def __init__(self, host, port, user, password, database, charset='utf8mb4'):
            """
            初始化数据库连接
            :param host: MySQL 主机地址
            :param user: 用户名
            :param password: 密码
            :param database: 数据库名
            :param port: 端口号,默认3306
            :param charset: 字符集,默认 utf8mb4
            """
            self.connection = pymysql.connect(
                host=host,
                user=user,
                password=password,
                database=database,
                port=port,
                charset=charset,
                cursorclass=DictCursor  # 结果以字典形式返回
            )
    
        def execute_query(self, sql, params=None):
            """
            执行 SELECT 查询语句
            :param sql: SQL 语句
            :param params: 参数字典或元组
            :return: 查询结果列表(每个元素为字典)
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(sql, params)
                    result = cursor.fetchall()
                    return result
            except Error as e:
                print(f"[ERROR] 查询执行失败: {e}")
                return None
    
        def execute_update(self, sql, params=None):
            """
            执行 UPDATE / INSERT / DELETE 等语句
            :param sql: SQL 语句
            :param params: 参数字典或元组
            :return: 受影响的行数
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(sql, params)
                self.connection.commit()
                return cursor.rowcount
            except Error as e:
                print(f"[ERROR] 更新执行失败: {e}")
                self.connection.rollback()
                return 0
        def execute_update_multirows(self, sql, params_list=None):
            """
            执行 UPDATE / INSERT / DELETE 等语句
            :param sql: SQL 语句
            :param params: 参数字典或元组
            :return: 受影响的行数
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.executemany(sql, params_list)
                self.connection.commit()
                return cursor.rowcount
            except Error as e:
                print(f"[ERROR] 更新执行失败: {e}")
                self.connection.rollback()
                return 0
    
        def begin_transaction(self):
            """ 开始事务 """
            try:
                self.connection.begin()
            except Error as e:
                print(f"[ERROR] 事务开始失败: {e}")
    
        def commit_transaction(self):
            """ 提交事务 """
            try:
                self.connection.commit()
            except Error as e:
                print(f"[ERROR] 事务提交失败: {e}")
    
        def rollback_transaction(self):
            """ 回滚事务 """
            try:
                self.connection.rollback()
            except Error as e:
                print(f"[ERROR] 事务回滚失败: {e}")
    
        def close(self):
            """ 关闭数据库连接 """
            if self.connection:
                self.connection.close()
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()
    
  3. 修改脚本中的SelectDBPolarDB的连接参数。

  4. 运行脚本。建议您首先获取所有表的DDL,以确认其正确性,然后再进行批量导入。

Elasticsearch批量导入索引定义的Python脚本示例

  1. 将以下代码保存为es_data_transfer.py

    import base64
    import http.client
    import json
    ## 集群host
    ClusterHost = "<elasticsearch-host>:<elasticsearch-port>"
    ## 集群用户名
    ClusterUserName = "<elasticsearch-username>"
    ## 集群密码
    ClusterPassword = "<elasticsearch-password>"
    DEFAULT_REPLICAS = 0
    
    def httpRequest(method, host, endpoint, params="", username="", password=""):
        conn = http.client.HTTPConnection(host)
        headers = {}
        if username:
            auth_str = f"{username}:{password}".encode('utf-8')
            base64string = base64.b64encode(auth_str).decode('utf-8')
            headers["Authorization"] = "Basic {}".format(base64string)
        if method == "GET":
            headers["Content-Type"] = "application/x-www-form-urlencoded"
            conn.request(method=method, url=endpoint, headers=headers)
        else:
            headers["Content-Type"] = "application/json"
            if params:
                conn.request(method=method, url=endpoint, body=params.encode('utf-8'), headers=headers)
            else:
                conn.request(method=method, url=endpoint, headers=headers)
        response = conn.getresponse()
        res = response.read().decode('utf-8')
        return res
    
    def httpGet(host, endpoint, username="", password=""):
        return httpRequest("GET", host, endpoint, "", username, password)
    def httpPost(host, endpoint, params, username="", password=""):
        return httpRequest("POST", host, endpoint, params, username, password)
    def httpPut(host, endpoint, params, username="", password=""):
        return httpRequest("PUT", host, endpoint, params, username, password)
    def getIndices(host, username="", password=""):
        endpoint = "/_cat/indices"
        indicesResult = httpGet(ClusterHost, endpoint, ClusterUserName, ClusterPassword)
        indicesList = indicesResult.split("\n")
        indexList = []
        for indices in indicesList:
            if (indices.find("open") > 0):
                indexList.append(indices.split()[2])
        return indexList
    def getSettings(index, host, username="", password=""):
        endpoint = "/" + index + "/_settings"
        indexSettings = httpGet(host, endpoint, username, password)
        settingsDict = json.loads(indexSettings)
        ## 分片数默认和源集群索引保持一致。
        number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
        ## 副本数默认为0。
        number_of_replicas = DEFAULT_REPLICAS
        newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
        return newSetting
    def getMapping(index, host, username="", password=""):
        endpoint = "/" + index + "/_mapping"
        indexMapping = httpGet(host, endpoint, username, password)
        mappingDict = json.loads(indexMapping)
        mappings = json.dumps(mappingDict[index]["mappings"])
        newMapping = "\"mappings\" : " + mappings
        return newMapping
    def createIndexStatement(IndexName):
        settingStr = getSettings(IndexName, ClusterHost, ClusterUserName, ClusterPassword)
        mappingStr = getMapping(IndexName, ClusterHost, ClusterUserName, ClusterPassword)
        createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
        return createstatement
    def createIndex(IndexName, newIndexName=""):
        if (newIndexName == "") :
            newIndexName = IndexName
        createstatement = createIndexStatement(IndexName)
        print ("PUT /" + newIndexName + "\n" + createstatement)
    
    indexList = getIndices(ClusterHost, ClusterUserName, ClusterPassword)
    systemIndex = []
    for index in indexList:
        if (index.startswith(".")):
            systemIndex.append(index)
        else :
            createIndex(index, index)
    # # 取消注释可检查被自动识别和省略的系统索引
    # if (len(systemIndex) > 0) :
    #     for index in systemIndex:
    #         print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
  2. 修改脚本中的Elasticsearch的连接参数。

  3. 运行脚本,将导出Elasticsearch中所有索引的创建语句,示例如下:

    PUT /product_info
    {
    "settings": {"number_of_shards": 5, "number_of_replicas": 0},
    "mappings" : {"properties": {"annual_rate": {"type": "keyword"}, "describe": {"type": "text", "analyzer": "ik_smart"}, "productName": {"type": "text", "analyzer": "ik_smart"}}}
    }
    PUT /user_info
    {
    "settings": {"number_of_shards": 5, "number_of_replicas": 0},
    "mappings" : {"properties": {"address": {"type": "text", "analyzer": "ik_smart"}, "describe": {"type": "text", "analyzer": "ik_smart"}, "userName": {"type": "text", "analyzer": "ik_smart"}}}
    }
  4. 将其保存并插入到PolarDB for AIschema_meta表中。

步骤三:构建检索索引

  1. 删除schema_index表,然后启动异步任务,从schema_meta表解析DDL并构建向量索引。

    重要

    以下操作将删除schema_index表。如果之前通过其他方式构建了索引,这些数据将会丢失,因此请谨慎操作。此操作仅用于演示示例效果。

    -- 1. 删除索引表
    /*polar4ai*/DROP TABLE schema_index;
    -- 2. 创建用于存储向量索引的表
    /*polar4ai*/CREATE TABLE schema_index(
      id integer, 
      table_name varchar, 
      table_comment text_ik_max_word,
      table_ddl text_ik_max_word, 
      column_names text_ik_max_word, 
      column_comments text_ik_max_word, 
      sample_values text_ik_max_word, 
      vecs vector_768,
      ext text_ik_max_word, 
      database_service text_ik_max_word, 
      PRIMARY key (id)
    );
    -- 3.启动异步任务,从 schema_meta 构建索引
    /*polar4ai*/SELECT * FROM PREDICT (
    MODEL _polar4ai_text2vec, SELECT ''
    ) WITH (mode='async', resource='schema_meta', data_service = 'ES') 
    INTO schema_index;
    说明

    data_service = 'ES':在处理Elasticsearch的索引定义时,必须显式指定此参数。

  2. 执行成功后,命令会返回一个任务ID(例如b3b8fd1e-b886-11f0-9f89-97664bebacb7)。记录此ID,用于后续查询任务状态。

步骤四:验证任务状态

使用上一步获取的task_id查询索引构建任务的进度和结果。当状态变为finish时,表示索引构建完成。

/*polar4ai*/SHOW TASK `此处替换为上一步获取的task_id`;

步骤五:使用Data-Agent进行查询

索引构建完成后,即可通过自然语言到SQL语言转义(基于大语言模型的NL2SQL)功能,使用自然语言进行数据查询。

/*polar4ai*/SELECT * FROM PREDICT (MODEL _polar4ai_nl2sql, select '短期高收益产品') WITH (basic_index_name='schema_index');

常见问题

验证任务状态(SHOW TASK)结果显示fail应如何处理?

任务失败通常由元数据格式错误或权限问题导致。检查以下项目:

  1. schema_info模式ext字段的JSON格式是否正确,以及db_type是否一致?

  2. schema_meta模式:DDL或索引定义的语法是否正确,是否遵循CREATEPUT开头的规则?

  3. 检查执行PREDICT的用户是否具有源表schema_info/schema_meta的读取权限和目标表schema_index的写入权限。

NL2SQL的转换结果不准确,如何优化?

结果的准确性依赖元数据质量。可尝试以下优化方法:

  1. 提供有效注释:在table_commentcolumn_comment中描述表和列的业务含义、枚举值说明等。

  2. 提供示例值:在sample_values中提供有代表性的数据,帮助模型理解列内容的格式和范围。

  3. 明确外键关系:在schema_info模式中正确设置is_foreignext字段,此操作对多表关联查询至关重要。