本文将介绍如何使用 PyMySQL 库和 OceanBase 数据库构建一个应用程序,实现基本的数据库操作,包括创建表、插入数据、查询数据和删除表等。
前提条件
您已安装 Python 3.x 和 pip。
您已安装 OceanBase 数据库并且创建了 MySQL 模式租户。
操作步骤
检查
Python
和pip
的版本。安装 PyMySQL 库。
获取 OceanBase 数据库连接信息。
修改
config.py
文件中的数据库连接信息。运行
main.py
文件。
步骤一:检查 Python 和 pip 的版本
打开命令提示符或 PowerShell 终端,运行python --version
和pip --version
命令,确保 Python 和 pip 正常安装。
示例如下:
PS C:\Windows\system32> python --version
Python 3.7.0
PS C:\Windows\system32> pip --version
pip 22.3.1 from d:\python\python37\lib\site-packages\pip (python 3.7)
步骤二:安装 PyMySQL 库
打开命令提示符或 PowerShell 终端,运行以下命令,安装 PyMySQL 库。
运行以下命令,进入代码的
python-pymysql
目录。示例如下:
cd python-pymysql
运行以下命令,安装项目所需的 Python 库。
示例如下:
pip install -r requirements.txt
您也可以直接打开命令提示符或 PowerShell 终端运行pip install pymysql
命令安装 PyMySQL 库。
步骤三:获取 OceanBase 数据库连接信息
联系 OceanBase 数据库部署人员或者管理员获取相应的数据库连接串。
obclient -h$host -P$port -u$user_name -p$password -D$database_name
参数说明:
$host
:OceanBase 数据库连接的域名。$port
:OceanBase 数据库连接端口,MySQL 模式租户默认是 3306。$database_name
:需要访问的数据库名称。$user_name
:租户的连接账号。$password
:提供账户密码。
更多连接串的信息,请参见 获取连接参数。
示例如下:
obclient -hxxx.xxx.xxx.xxx -P3306 -utest_user001 -p****** -Dtest
步骤四:修改 config.py 文件中的数据库连接信息
根据 步骤三:获取 OceanBase 数据库连接信息 中的信息修改项目文件python-pymysql/config.py
中的数据库连接信息。
进入
python-pymysql
项目文件夹。修改
config.py
文件中的数据库连接信息。在 Windows 环境下,使用文本编辑器打开
config.py
文件,修改文件中的数据库连接信息,确保与实际情况相符。在 Linux 环境下,可以使用
vi config.py
或者vim config.py
命令编辑config.py
文件,修改文件中的数据库连接信息,确保与实际情况相符。
config.py
文件中的数据库连接信息示例如下:DB_CONFIG = { 'host': '10.10.10.1', 'port': 3306, 'user': 'test_user001', 'password': '******', 'database': 'test', 'charset': 'utf8mb4' }
步骤五:运行 main.py 文件
打开命令提示符或 PowerShell 终端,运行main.py
文件,查询数据并输出结果。
进入
python-pymysql
项目目录下。示例如下:
cd D:\demo\demo\python-pymysql
运行
main.py
文件。示例如下:
python main.py
返回结果如下:
2023-11-10 16:56:48,021 - INFO - Start executing the script 2023-11-10 16:56:48,021 - INFO - Start creating the table 2023-11-10 16:56:48,281 - INFO - Table creation successful 2023-11-10 16:56:48,281 - INFO - Start inserting data 2023-11-10 16:56:48,540 - INFO - Data insertion successful (1, 'John', 20) (2, 'Lucy', 25) (3, 'Tom', 30) 2023-11-10 16:56:48,737 - INFO - Start dropping the table 2023-11-10 16:56:48,999 - INFO - Table dropped successfully 2023-11-10 16:56:48,999 - INFO - Script execution completed
项目代码介绍
点击 python-pymysql 下载项目代码,是一个名称为python-pymysql.zip
的压缩包。
解压后,得到一个名为python-pymysql
的文件夹。目录结构如下所示:
python-pymysql
├── config.py
├── test_sql.py
├── main.py
└── requirements.txt
文件说明:
config.py
:用于存储数据库连接信息。test_sql.py
:用于存储 SQL 语句。main.py
:主程序入口,用于执行数据库的基本操作,包括创建表、插入数据、查询数据和删除表。requirements.txt
:用于存储项目所需要的 Python 包及其版本信息。说明本文获取的代码中只列出了 PyMySQL 库的版本要求,可以通过
sudo pip install -r requirements.txt
命令安装,执行以上命令后,会自动安装所需的库。
config.py 代码介绍
本文获取的config.py
文件中
的代码定义了数据库连接信息。数据库连接信息主要包括以下几个部分:
指定连接数据库的 IP 地址、端口号、用户名、密码、数据库名称和字符集。
代码如下:
DB_CONFIG = {
'host': '$host',
'port': $port,
'user': '$user_name',
'password': '$password',
'database': '$database_name',
'charset': 'utf8mb4'
}
参数解释:
host
:OceanBase 数据库连接的域名。port
:提供 OceanBase 数据库连接端口,默认端口是 3306。user
:提供租户的连接账户。password
:提供账户密码。database
:需要连接的数据库名。charset
:连接数据库时使用的字符集。
这里的参数值是根据具体环境和数据库设置而定的,需要根据实际情况进行修改。
test_sql.py 代码介绍
本文获取的test_sql.py
文件中的代码定义了数据库操作的 SQL 语句,包括创建表、插入数据、查询数据和删除表,这些 SQL 语句可以通过 PyMySQL 连接数据库后执行,以实现相应的功能。
该文件中的代码主要包括以下几个部分:
创建表的 SQL 语句。
定义创建表
test_pymysql
的 SQL 语句,在表中定义了三个字段,分别为id
、name
和age
,其中id
是自增长的主键。代码如下:
CREATE_TABLE_SQL = ''' CREATE TABLE test_pymysql ( id INT(11) NOT NULL AUTO_INCREMENT, name VARCHAR(128) NOT NULL, age INT(11) NOT NULL, PRIMARY KEY (id) ) '''
插入数据的 SQL 语句。
定义一个向
test_pymysql
表插入数据的 SQL 语句,插入的数据将包含三条,每条数据包含两个字段:name
和age
。每个字段的值将在执行 SQL 语句时,通过占位符%s
的形式被传入。代码如下:
INSERT_DATA_SQL = ''' INSERT INTO test_pymysql (name, age) VALUES (%s, %s), (%s, %s), (%s, %s) '''
查询数据的 SQL 语句。
定义查询数据的 SQL 语句,从
test_pymysql
表中查询所有数据。代码如下:
SELECT_DATA_SQL = ''' SELECT * FROM test_pymysql '''
删除表的 SQL 语句。
定义删除表的 SQL 语句,将
test_pymysql
表删除。代码如下:
DROP_TABLE_SQL = ''' DROP TABLE test_pymysql '''
main.py 代码介绍
本文获取的main.py
文件中的代码通过调用pymysql
模块,连接 MySQL 数据库,以及调用logging
模块,输出日志信息,实现创建表、插入数据、查询数据和删除表的操作。
main.py
文件中的代码主要包括以下几个部分:
导入所需的模块。
导入
logging
模块。导入
pymysql
模块。导入
config.py
模块,其中定义了数据库连接信息。导入
test_sql.py
模块,其中定义了数据库操作的 SQL 语句。
代码如下:
import logging import pymysql from config import DB_CONFIG from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL
设置日志记录的级别和格式,并输出一条 INFO 级别的日志信息,表示开始执行脚本。
代码如下:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.info('Start executing the script')
定义创建表的函数。
定义一个名为
create_table()
的函数,输出一条 INFO 级别的日志信息,表示开始创建表。使用with
语句管理数据库连接和游标对象的生命周期,保证了数据库连接和游标对象的安全关闭,避免了内存泄漏等问题。执行创建表的 SQL 语句,提交事务并输出日志信息,或回滚事务并输出错误日志信息。代码如下:
def create_table(): logging.info('Start creating the table') with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: try: cursor.execute(CREATE_TABLE_SQL) conn.commit() logging.info('Table creation successful') except Exception as e: conn.rollback() logging.error('Table creation failed, Reason:%s' % e)
定义插入数据的函数。
定义一个名为
insert_data()
的函数,输出一条 INFO 级别的日志信息,表示开始插入数据。使用with
语句管理数据库连接和游标对象的生命周期,保证了数据库连接和游标对象的安全关闭,避免了内存泄漏等问题。执行插入数据的 SQL 语句,提交事务并输出日志信息,或回滚事务并输出错误日志信息。代码如下:
def insert_data(): logging.info('Start inserting data') with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: try: data = [('John', 20), ('Lucy', 25), ('Tom', 30)] flattened_data = [d for item in data for d in item] cursor.executemany(INSERT_DATA_SQL, [flattened_data]) conn.commit() logging.info('Data insertion successful') except Exception as e: conn.rollback() logging.error('Data insertion failed, Reason:%s' % e)
定义查询数据的函数。
定义一个名为
select_data()
的函数,用于从数据库中查询数据。使用with
语句管理数据库连接和游标对象的生命周期,保证了数据库连接和游标对象的安全关闭,避免了内存泄漏等问题。使用execute()
方法执行SELECT_DATA_SQL
定义的 SQL 语句查询数据。使用fetchall()
方法获取查询结果,并通过for
循环逐行输出结果。代码如下:
def select_data(): with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: cursor.execute(SELECT_DATA_SQL) result = cursor.fetchall() for row in result: print(row)
定义删除表的函数。
定义一个名为
drop_table()
函数,该函数使用了预先定义的数据库连接信息(DB_CONFIG
)和删除表的 SQL 语句(DROP_TABLE_SQL
)。函数会执行删除表的操作,并打印相应的日志信息表示操作成功或失败。如果删除表操作失败,会打印出错误信息。代码如下:
def drop_table(): logging.info('Start dropping the table') with pymysql.connect(**DB_CONFIG) as conn: with conn.cursor() as cursor: try: cursor.execute(DROP_TABLE_SQL) conn.commit() logging.info('Table dropped successfully') except Exception as e: conn.rollback() logging.error('Table drop failed, Reason:%s' % e)
定义程序的入口点,主要用于执行数据库操作的函数。
首先判断当前模块是否作为主程序运行,如果是,则执行以下操作:
调用
create_table()
函数,创建数据库表。调用
insert_data()
函数,向表中插入数据。调用
select_data()
函数,从表中查询数据。调用
drop_table()
函数,删除数据库表。
代码如下:
if __name__ == '__main__': create_table() insert_data() select_data() drop_table()
输出一条 INFO 级别的日志信息,表示脚本执行完成。
代码如下:
logging.info('Script execution completed')
完整的代码展示
config.py
# Database Connection
DB_CONFIG = {
'host': '$host',
'port': $port,
'user': '$user_name',
'password': '$password',
'database': '$database_name',
'charset': 'utf8mb4'
}
test_sql.py
Create table
CREATE_TABLE_SQL = '''
CREATE TABLE test_pymysql (
id INT(11) NOT NULL AUTO_INCREMENT,
name VARCHAR(128) NOT NULL,
age INT(11) NOT NULL,
PRIMARY KEY (id)
)
'''
Insert data
INSERT_DATA_SQL = '''
INSERT INTO test_pymysql (name, age) VALUES
(%s, %s),
(%s, %s),
(%s, %s)
'''
Query data
SELECT_DATA_SQL = '''
SELECT * FROM test_pymysql
'''
Delete table
DROP_TABLE_SQL = '''
DROP TABLE test_pymysql
'''
main.py
import logging
import pymysql
from config import DB_CONFIG
from test_sql import CREATE_TABLE_SQL, INSERT_DATA_SQL, SELECT_DATA_SQL, DROP_TABLE_SQL
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Start executing the script')
Create table
def create_table():
logging.info('Start creating the table')
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
try:
cursor.execute(CREATE_TABLE_SQL)
conn.commit()
logging.info('Table creation successful')
except Exception as e:
conn.rollback()
logging.error('Table creation failed, Reason:%s' % e)
Insert data
def insert_data():
logging.info('Start inserting data')
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
try:
data = [('John', 20), ('Lucy', 25), ('Tom', 30)]
flattened_data = [d for item in data for d in item]
cursor.executemany(INSERT_DATA_SQL, [flattened_data])
conn.commit()
logging.info('Data insertion successful')
except Exception as e:
conn.rollback()
logging.error('Data insertion failed, Reason:%s' % e)
Query data
def select_data():
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
cursor.execute(SELECT_DATA_SQL)
result = cursor.fetchall()
for row in result:
print(row)
Delete table
def drop_table():
logging.info('Start dropping the table')
with pymysql.connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
try:
cursor.execute(DROP_TABLE_SQL)
conn.commit()
logging.info('Table dropped successfully')
except Exception as e:
conn.rollback()
logging.error('Table drop failed, Reason:%s' % e)
if __name__ == '__main__':
create_table()
insert_data()
select_data()
drop_table()
logging.info('Script execution completed')
相关文档
更多连接 OceanBase 数据库的信息,请参见 连接方式概述。