PyMySQL 连接 OceanBase 数据库示例程序

本文将介绍如何使用 PyMySQL 库和 OceanBase 数据库构建一个应用程序,实现基本的数据库操作,包括创建表、插入数据、查询数据和删除表等。

image.png点击下载 python-pymysql 示例工程

前提条件

  • 您已安装 Python 3.x 和 pip。

  • 您已安装 OceanBase 数据库并且创建了 MySQL 模式租户。

操作步骤

  1. 检查Pythonpip的版本。

  2. 安装 PyMySQL 库。

  3. 获取 OceanBase 数据库连接信息。

  4. 修改config.py文件中的数据库连接信息。

  5. 运行main.py文件。

步骤一:检查 Python 和 pip 的版本

打开命令提示符或 PowerShell 终端,运行python --versionpip --version命令,确保 Python 和 pip 正常安装。

示例如下:

PS C:\Windows\system32> python --version
Python 3.7.0
PS C:\Windows\system32> pip --version
pip 22.3.1 from d:\python\python37\lib\site-packages\pip (python 3.7)

步骤二:安装 PyMySQL 库

打开命令提示符或 PowerShell 终端,运行以下命令,安装 PyMySQL 库。

  1. 运行以下命令,进入代码的python-pymysql目录。

    示例如下:

    cd python-pymysql
  2. 运行以下命令,安装项目所需的 Python 库。

    示例如下:

    pip install -r requirements.txt
说明

您也可以直接打开命令提示符或 PowerShell 终端运行pip install pymysql命令安装 PyMySQL 库。

步骤三:获取 OceanBase 数据库连接信息

联系 OceanBase 数据库部署人员或者管理员获取相应的数据库连接串。

obclient -h$host -P$port -u$user_name -p$password -D$database_name

参数说明:

  • $hostOceanBase 数据库连接的域名。

  • $port:OceanBase 数据库连接端口,MySQL 模式租户默认是 3306。

  • $database_name需要访问的数据库名称。

  • $user_name:租户的连接账号。

  • $password提供账户密码。

更多连接串的信息,请参见 获取连接参数

示例如下:

obclient -hxxx.xxx.xxx.xxx -P3306 -utest_user001 -p****** -Dtest

步骤四:修改 config.py 文件中的数据库连接信息

根据 步骤三:获取 OceanBase 数据库连接信息 中的信息修改项目文件python-pymysql/config.py中的数据库连接信息。

  1. 进入python-pymysql项目文件夹。

  2. 修改config.py文件中的数据库连接信息。

    • 在 Windows 环境下,使用文本编辑器打开config.py文件,修改文件中的数据库连接信息,确保与实际情况相符。

    • 在 Linux 环境下,可以使用vi config.py或者vim config.py命令编辑config.py文件,修改文件中的数据库连接信息,确保与实际情况相符。

    config.py文件中的数据库连接信息示例如下:

    DB_CONFIG = {
        'host': '10.10.10.1',
        'port': 3306,
        'user': 'test_user001',
        'password': '******',
        'database': 'test',
        'charset': 'utf8mb4'
    }

步骤五:运行 main.py 文件

打开命令提示符或 PowerShell 终端,运行main.py文件,查询数据并输出结果。

  1. 进入python-pymysql项目目录下。

    示例如下:

    cd D:\demo\demo\python-pymysql
  2. 运行main.py文件。

    示例如下:

    python main.py

    返回结果如下:

    2023-11-10 16:56:48,021 - INFO - Start executing the script
    2023-11-10 16:56:48,021 - INFO - Start creating the table
    2023-11-10 16:56:48,281 - INFO - Table creation successful
    2023-11-10 16:56:48,281 - INFO - Start inserting data
    2023-11-10 16:56:48,540 - INFO - Data insertion successful
    (1, 'John', 20)
    (2, 'Lucy', 25)
    (3, 'Tom', 30)
    2023-11-10 16:56:48,737 - INFO - Start dropping the table
    2023-11-10 16:56:48,999 - INFO - Table dropped successfully
    2023-11-10 16:56:48,999 - INFO - Script execution completed

项目代码介绍

点击 python-pymysql 下载项目代码,是一个名称为python-pymysql.zip的压缩包。

解压后,得到一个名为python-pymysql的文件夹。目录结构如下所示:

python-pymysql
├── config.py
├── test_sql.py
├── main.py
└── requirements.txt

文件说明:

  • config.py:用于存储数据库连接信息。

  • test_sql.py:用于存储 SQL 语句。

  • main.py:主程序入口,用于执行数据库的基本操作,包括创建表、插入数据、查询数据和删除表。

  • requirements.txt:用于存储项目所需要的 Python 包及其版本信息。

    说明

    本文获取的代码中只列出了 PyMySQL 库的版本要求,可以通过sudo pip install -r requirements.txt命令安装,执行以上命令后,会自动安装所需的库。

config.py 代码介绍

本文获取的config.py文件中

的代码定义了数据库连接信息。数据库连接信息主要包括以下几个部分:

指定连接数据库的 IP 地址、端口号、用户名、密码、数据库名称和字符集。

代码如下:

DB_CONFIG = {
    'host': '$host',
    'port': $port,
    'user': '$user_name',
    'password': '$password',
    'database': '$database_name',
    'charset': 'utf8mb4'
}

参数解释:

  • hostOceanBase 数据库连接的域名。

  • port:提供 OceanBase 数据库连接端口,默认端口是 3306。

  • user提供租户的连接账户。

  • password提供账户密码。

  • database:需要连接的数据库名。

  • charset:连接数据库时使用的字符集。

重要

这里的参数值是根据具体环境和数据库设置而定的,需要根据实际情况进行修改。

test_sql.py 代码介绍

本文获取的test_sql.py文件中的代码定义了数据库操作的 SQL 语句,包括创建表、插入数据、查询数据和删除表,这些 SQL 语句可以通过 PyMySQL 连接数据库后执行,以实现相应的功能。

该文件中的代码主要包括以下几个部分:

  1. 创建表的 SQL 语句。

    定义创建表test_pymysql的 SQL 语句,在表中定义了三个字段,分别为idnameage,其中id是自增长的主键。

    代码如下:

    CREATE_TABLE_SQL = '''
    CREATE TABLE test_pymysql (
    id INT(11) NOT NULL AUTO_INCREMENT,
    name VARCHAR(128) NOT NULL,
    age INT(11) NOT NULL,
    PRIMARY KEY (id)
    )
    '''
  2. 插入数据的 SQL 语句。

    定义一个向test_pymysql表插入数据的 SQL 语句,插入的数据将包含三条,每条数据包含两个字段:nameage。每个字段的值将在执行 SQL 语句时,通过占位符%s的形式被传入。

    代码如下:

    INSERT_DATA_SQL = '''
    INSERT INTO test_pymysql (name, age) VALUES 
    (%s, %s),
    (%s, %s),
    (%s, %s)
    '''
  3. 查询数据的 SQL 语句。

    定义查询数据的 SQL 语句,从test_pymysql表中查询所有数据。

    代码如下:

    SELECT_DATA_SQL = '''
    SELECT * FROM test_pymysql
    '''
  4. 删除表的 SQL 语句。

    定义删除表的 SQL 语句,将test_pymysql表删除。

    代码如下:

    DROP_TABLE_SQL = '''
    DROP TABLE test_pymysql
    '''

main.py 代码介绍

本文获取的main.py文件中的代码通过调用pymysql模块,连接 MySQL 数据库,以及调用logging模块,输出日志信息,实现创建表、插入数据、查询数据和删除表的操作。

main.py文件中的代码主要包括以下几个部分:

  1. 导入所需的模块。

    1. 导入logging模块。

    2. 导入pymysql模块。

    3. 导入config.py模块,其中定义了数据库连接信息。

    4. 导入test_sql.py模块,其中定义了数据库操作的 SQL 语句。

    代码如下:

    import logging
    import pymysql
    from config import DB_CONFIG
    from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL
  2. 设置日志记录的级别和格式,并输出一条 INFO 级别的日志信息,表示开始执行脚本。

    代码如下:

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logging.info('Start executing the script')
  3. 定义创建表的函数。

    定义一个名为create_table()的函数,输出一条 INFO 级别的日志信息,表示开始创建表。使用with语句管理数据库连接和游标对象的生命周期,保证了数据库连接和游标对象的安全关闭,避免了内存泄漏等问题。执行创建表的 SQL 语句,提交事务并输出日志信息,或回滚事务并输出错误日志信息。

    代码如下:

    def create_table():
        logging.info('Start creating the table')
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                try:
                    cursor.execute(CREATE_TABLE_SQL)
                    conn.commit()
                    logging.info('Table creation successful')
                except Exception as e:
                    conn.rollback()
                    logging.error('Table creation failed, Reason:%s' % e)
  4. 定义插入数据的函数。

    定义一个名为insert_data()的函数,输出一条 INFO 级别的日志信息,表示开始插入数据。使用with语句管理数据库连接和游标对象的生命周期,保证了数据库连接和游标对象的安全关闭,避免了内存泄漏等问题。执行插入数据的 SQL 语句,提交事务并输出日志信息,或回滚事务并输出错误日志信息。

    代码如下:

    def insert_data():
        logging.info('Start inserting data')
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                try:
                    data = [('John', 20), ('Lucy', 25), ('Tom', 30)]
                    flattened_data = [d for item in data for d in item]
                    cursor.executemany(INSERT_DATA_SQL, [flattened_data])
                    conn.commit()
                    logging.info('Data insertion successful')
                except Exception as e:
                    conn.rollback()
                    logging.error('Data insertion failed, Reason:%s' % e)
  5. 定义查询数据的函数。

    定义一个名为select_data()的函数,用于从数据库中查询数据。使用with语句管理数据库连接和游标对象的生命周期,保证了数据库连接和游标对象的安全关闭,避免了内存泄漏等问题。使用execute()方法执行SELECT_DATA_SQL定义的 SQL 语句查询数据。使用fetchall()方法获取查询结果,并通过for循环逐行输出结果。

    代码如下:

    def select_data():
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                cursor.execute(SELECT_DATA_SQL)
                result = cursor.fetchall()
                for row in result:
                    print(row)
  6. 定义删除表的函数。

    定义一个名为drop_table()函数,该函数使用了预先定义的数据库连接信息(DB_CONFIG)和删除表的 SQL 语句(DROP_TABLE_SQL)。函数会执行删除表的操作,并打印相应的日志信息表示操作成功或失败。如果删除表操作失败,会打印出错误信息。

    代码如下:

    def drop_table():
        logging.info('Start dropping the table')
        with pymysql.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cursor:
                try:
                    cursor.execute(DROP_TABLE_SQL)
                    conn.commit()
                    logging.info('Table dropped successfully')
                except Exception as e:
                    conn.rollback()
                    logging.error('Table drop failed, Reason:%s' % e)
  7. 定义程序的入口点,主要用于执行数据库操作的函数。

    首先判断当前模块是否作为主程序运行,如果是,则执行以下操作:

    1. 调用create_table()函数,创建数据库表。

    2. 调用insert_data()函数,向表中插入数据。

    3. 调用select_data()函数,从表中查询数据。

    4. 调用drop_table()函数,删除数据库表。

    代码如下:

    if __name__ == '__main__':
        create_table()
        insert_data()
        select_data()
        drop_table()
  8. 输出一条 INFO 级别的日志信息,表示脚本执行完成。

    代码如下:

    logging.info('Script execution completed')

完整的代码展示

config.py

# Database Connection
DB_CONFIG = {
    'host': '$host',
    'port': $port,
    'user': '$user_name',
    'password': '$password',
    'database': '$database_name',
    'charset': 'utf8mb4'
}

test_sql.py

Create table
CREATE_TABLE_SQL = '''
CREATE TABLE test_pymysql (
  id INT(11) NOT NULL AUTO_INCREMENT,
  name VARCHAR(128) NOT NULL,
  age INT(11) NOT NULL,
  PRIMARY KEY (id)
  )
'''

Insert data
INSERT_DATA_SQL = '''
INSERT INTO test_pymysql (name, age) VALUES 
(%s, %s),
(%s, %s),
(%s, %s)
'''

Query data
SELECT_DATA_SQL = '''
SELECT * FROM test_pymysql
'''

Delete table
DROP_TABLE_SQL = '''
DROP TABLE test_pymysql
'''

main.py

import logging
import pymysql
from config import DB_CONFIG
from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Start executing the script')

Create table 
def create_table():
    logging.info('Start creating the table')
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            try:
                cursor.execute(CREATE_TABLE_SQL)
                conn.commit()
                logging.info('Table creation successful')
            except Exception as e:
                conn.rollback()
                logging.error('Table creation failed, Reason:%s' % e)

Insert data
def insert_data():
    logging.info('Start inserting data')
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            try:
                data = [('John', 20), ('Lucy', 25), ('Tom', 30)]
                flattened_data = [d for item in data for d in item]
                cursor.executemany(INSERT_DATA_SQL, [flattened_data])
                conn.commit()
                logging.info('Data insertion successful')
            except Exception as e:
                conn.rollback()
                logging.error('Data insertion failed, Reason:%s' % e)

Query data
def select_data():
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            cursor.execute(SELECT_DATA_SQL)
            result = cursor.fetchall()
            for row in result:
                print(row)

Delete table
def drop_table():
    logging.info('Start dropping the table')
    with pymysql.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            try:
                cursor.execute(DROP_TABLE_SQL)
                conn.commit()
                logging.info('Table dropped successfully')
            except Exception as e:
                conn.rollback()
                logging.error('Table drop failed, Reason:%s' % e)

if __name__ == '__main__':
    create_table()
    insert_data()
    select_data()
    drop_table()

logging.info('Script execution completed')

相关文档