使用Python连接池DBUtils连接数据库

如果您的应用侧主要使用Python语言,且数据库连接创建频繁(例如短连接场景)或连接数量较大(大于MySQL数据库的连接数限制),您可以使用Python连接池DBUtils连接数据库,降低连接建立频率以减少数据库主线程的开销。

前提条件

  • 应用服务器已安装Python环境且Python版本在3.7至3.12。

  • 已将服务器IP地址添加至RDS实例白名单中,详情请参见设置IP白名单

    说明

    若您的应用程序部署在阿里云ECS服务器上,且ECS与RDS已实现内网互通(ECS与RDS实例的地域、VPC均相同),则无需设置IP白名单。

准备工作

  • 安装DBUtils:您需要通过以下命令在服务器上安装Python连接池DBUtils,本教程中DBUtils版本为3.1.0,更多版本及信息请参见DBUtils

    pip install DBUtils==3.1.0
  • 安装PyMySQL:您需要通过以下命令安装PyMySQL,以支持服务器连接数据库。

    pip install pymysql

使用DBUtils连接数据库

  1. 导入相关模块:在使用DBUtils连接数据库前,您需要通过以下代码导入相关模块。

    from dbutils.pooled_db import PooledDB
    import importlib
    import pymysql
  2. 构建DBUtilsDemo:为了方便您后续的方法调用与管理,建议您新建一个DBUtilsDemo类,在__init__方法中设置连接池参数,并向该类中添加_connect_close方法以便从连接池中获取与归还数据库连接。

    class DBUtilsDemo:
        def __init__(self, url, user, password, database):
            db_config = {
                "host": url,
                "port": 3306,
                "user": user,
                "db": database,
                "password": password,
                "charset": "utf8",
                "connect_timeout": 3,
                "read_timeout": 10,
                "write_timeout": 10
            }
            '''
              DBUtils连接池常见参数的详细说明请参见后文,以下为参数简要说明:
                mincached: 池中闲置连接的初始数量(0表示启动时不会创建连接)。
                maxcached: 池中闲置连接的最大数量(0或None表示连接池大小没有限制)。
                maxshared: 共享连接的最大数量(0或None表示所有连接都是专用的)。
                maxconnections: 一般允许的最大连接数量(0或None表示连接不受限制)。
                blocking: 在超过最大连接数量时的行为。
                maxusage: 单个连接的最大重用次数(0或None表示无限重用)。
            '''
            self.pooled = PooledDB(pymysql, maxcached=20, maxshared=0, maxconnections=100, maxusage=20, **db_config)
    
        # 从连接池获取连接
        def _connect(self):
            try:
                r = self.pooled.connection()
                return r
            except Exception as e:
                print("Failed to connect:" + str(e))
    
        # 归还连接到连接池
        def _close(self, conn, stmt):
            if stmt:
                stmt.close()
            if conn:
                conn.close()
  3. __main__函数中初始化DBUtilsDemo类并连接数据库:DBUtilsDemo类构造完成后,您可以在__main__函数中初始化该类,并获取数据库连接。

    if __name__ == '__main__':
        # RDS MySQL实例连接地址,需要根据服务器情况选择RDS内网/外网连接地址。
        url = 'rm-bp**************.mysql.rds.aliyuncs.com'
        # 用户名,根据实际情况做替换。
        user = 'dbuser'
        # 密码,根据实际情况做替换。
        password = '****'
        # 连接的数据库名称,根据实际情况做替换。
        database = 'dbtest'
    
        # 获取连接池对象
        poolUtils = DBUtilsDemo(url, user, password, database)

连接池常见参数配置

在使用DBUtils连接数据库时,建议您根据下述内容为连接池设置合适的参数(在__init__方法调用的PooledDB函数中设置),使数据库的运行更稳定高效。

重要

为了最大程度地避免潜在的风险和不确定性,在将新的参数值用于生产环境前,建议您至少进行一轮完整的功能测试和性能测试,以确保系统稳定性和可靠性。

推荐配置的参数

推荐您在使用DBUtils连接池时设置以下参数,降低数据库运行风险。

参数名

含义

默认值

推荐值

说明

maxcached

池中闲置连接的最大数量。

0

20

  • 0或None表示连接池大小没有限制。

  • 适当配置该参数可以预留一定数量的连接,快速处理突发的数据库请求。

maxusage

单个连接的最大重用次数。

None

10~20

  • 0或None表示无限重用,当达到连接的最大重用次数时,连接将自动重置(关闭并重新打开)。

  • 适当配置该参数可以防止空闲连接一直存活占用资源。

  • 建议根据应用程序的实际需求以及数据库的处理能力决定该参数配置。

connect_timeout

连接到数据库的超时时间()。

10

3

  • 最小值1,最大值31536000

  • 建议设置1~10秒之间,取决于网络质量高低,以及应用端与服务端的距离。

read_timeout

从数据库读取的超时时间()。

None

10~60

  • 默认值None表示没有超时限制。

  • 该参数不宜设置过短。针对读取SQL执行时间明显过长而导致超时的问题,应当首先检查SQL或者数据库本身,而不是优先调整该参数。

write_timeout

写入数据库的超时时间()。

None

10~60

  • 默认值None表示没有超时限制。

  • 该参数不宜设置过短。针对写入SQL执行时间明显过长而导致超时的问题,应当首先检查SQL或者数据库本身,而不是优先调整该参数。

可选择配置的参数

使用DBUtils连接池时,您可以选择性地配置以下参数,提升数据库性能。

参数名

含义

默认值

推荐值

说明

maxconnections

连接池允许的最大连接数量。

0

100

  • 0或None表示连接不受限制。

  • 需要根据数据库的并发处理能力和负载情况来合理设置该值。

  • 通常情况下,建议将该值设置为数据库能够同时处理的最大连接数。

保持默认配置的参数

对于以下常见的连接池参数,您可以选择直接使用默认配置或根据自身需求调整参数。

参数名

含义

默认值

推荐值

说明

mincached

池中闲置连接的初始数量。

0

/

  • 0表示启动时不会创建连接。

  • 推荐默认配置,即启动时不创建连接,等需要使用时再创建。

maxshared

共享连接的最大数量。

0

/

  • 0或None表示所有连接都是专用的。

  • 当到达最大数量时,如果连接已经被声明为可共享,则保持可共享不变。

blocking

超过最大连接数量时的处理方法。

False

/

  • 设置为True时,会阻塞等待直到连接数减少。

  • 设置为False时,数据库会报告错误。

setsession

连接被创建时执行的 SQL 语句

None

/

该参数可以用来设置会话级别的参数或者执行其他初始化操作。

reset

连接被放回连接池时,是否需要重置连接状态。

True

/

  • 设置为True时,表示需要重置。

  • 设置为False时,表示不需要重置。

failures

数据库连接失败时的重连次数。

None

/

/

ping

连接池在空闲连接时执行的心跳检测操作。

1

/

该参数用来验证连接可用性。

后续数据库操作

您可以在DBUtilsDemo类中添加自定义方法处理读写请求,满足对数据库的操作需求。

读请求处理

1. 添加自定义方法

您需要在DBUtilsDemo类中添加自定义方法处理数据库读请求,本文以查询单条记录与查询多条记录为例。

  • 查询单条记录

    def select_row(self, sql):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql)
            row = statement.fetchone()
            return row
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)
  • 查询多条记录

    def select_rows(self, sql):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql)
            rows = statement.fetchall()
            return rows
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)

2. 调用自定义方法

您需要在__main__函数中调用该方法进行数据查询。

if __name__ == '__main__':
    # 配置连接参数并连接数据库
    url = 'rm-bp**************.mysql.rds.aliyuncs.com'
    user = 'dbuser'
    password = '****'
    database = 'dbtest'
    poolUtils = DBUtilsDemo(url, user, password, database)

    # 查询单条记录
    row = poolUtils.select_row("select * from tb where id = 'i001' limit 1")
    print(row)

    # 查询多条记录
    rows = poolUtils.select_rows("select * from tb")
    print(rows)

写请求处理

1. 添加自定义方法

您需要在DBUtilsDemo类中添加自定义方法处理数据库写请求(包括INSERTUPDATEDELETECREATE TABLE等命令),本文以带参数写请求与不带参数写请求为例。

  • 带参数写请求处理

    def upsert_data_prams(self, sql_upsert, params):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql_upsert, params)
            connection.commit()
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)
  • 不带参数写请求处理

    def upsert_data(self, sql_upsert):
        connection = self._connect()
        statement = None
        try:
            statement = connection.cursor()
            statement.execute(sql_upsert)
            connection.commit()
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)

2. 调用自定义方法

您需要在__main__函数中调用该方法执行SQL语句。

if __name__ == '__main__':
    # 配置连接参数并连接数据库
    url = 'rm-bp**************.mysql.rds.aliyuncs.com'
    user = 'dbuser'
    password = '****'
    database = 'dbtest'
    poolUtils = DBUtilsDemo(url, user, password, database)

    # 不带参数写请求处理
    poolUtils.upsert_data("insert into tb(id,name,address) values ('i001','n001','a001')")

    # 带参数写请求处理
    params = ['i002', 'n002', 'a002']
    poolUtils.upsert_data_prams("insert into tb(id,name,address) values (%s,%s,%s)", params)

相关文档