基于Python的应用开发(mysql-connector-python)

更新时间:
复制为 MD 格式

mysql-connector-pythonMySQL官方提供的Python连接器,不依赖C语言标准函数库,使用更便捷。本文介绍如何在Python中通过mysql-connector-python连接Lindorm宽表引擎。

前提条件

  • 已安装Python环境,且Python版本为3.8及以上版本。

  • 已开通MySQL协议兼容功能。如何开通,请参见开通MySQL协议兼容功能

  • 已将客户端IP添加至白名单,具体操作请参见设置白名单

操作步骤

  1. 安装8.0.11版本的mysql-connector-python。您也可以通过pip install mysql-connector-python==8.0.11语句直接通过pip进行安装。

  2. 创建连接并配置连接参数。

    connection = mysql.connector.connect(host='<MySQL兼容地址>', port=33060, user='<用户名>', passwd='<密码>', database='<数据库名>')
    cursor = connection.cursor(prepared=True)
    

    参数说明

    参数

    说明

    host

    Lindorm宽表引擎的Lindorm 宽表SQL地址,需去掉末尾的冒号及端口号:33060。如何获取,请参见查看连接地址

    重要

    如果应用部署在ECS,且ECSLindorm实例部署在同一专有网络,建议您通过专有网络访问Lindorm实例,否则请使用公网访问。通过公网连接Lindorm前需在控制台开通公网地址,开通方式请参见开通步骤

    port

    Lindorm宽表引擎MySQL协议的端口,固定为33060

    user

    如果您忘记用户密码,可以通过Lindorm宽表引擎的集群管理系统修改密码。具体操作,请参见修改用户密码

    passwd

    database

    需要连接的数据库名称。默认连接default数据库。

  3. 通过宽表SQL语法使用Lindorm宽表引擎。以创建表为例。

    sql_create_table = ("create table if not exists test_python(c1 integer, c2 integer, c3 varchar, primary key(c1))")
    print(sql_create_table)
    cursor.execute(sql_create_table)
    

代码示例

Python中使用mysql-connector-python连接Lindorm宽表引擎时,支持两种模式:

  • 直接连接模式:适用于单次操作或低频访问场景,每次操作需独立建立和关闭连接。

  • 连接池模式:适用于高频访问场景,通过复用连接提升性能并减少资源开销。

直接连接模式

示例代码如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json

import mysql.connector

# 打开数据库连接。
# host是Lindorm宽表引擎MySQL协议的连接地址。
# port是Lindorm宽表引擎MySQL协议的端口,一般为33060。
# user是Lindorm宽表引擎的用户账号。
# passwd是Lindorm宽表引擎的用户账号对应的密码。
# database是Lindorm宽表引擎中的数据库名。


connection = mysql.connector.connect(host='ld-bp1hn6yq0yb34****-proxy-sql-lindorm-public.lindorm.rds.aliyuncs.com', port=33060,user='root', passwd='test',database='default')

# 创建cursor,注意prepared=True
cursor = connection.cursor(prepared=True)

# 创建表
sql_create_table = ("create table if not exists test_python(c1 integer, c2 integer, c3 varchar, primary key(c1))")
print(sql_create_table)
cursor.execute(sql_create_table)

# 插入数据
sql_upsert = "upsert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_upsert)

# 执行单条插入语句
cursor.execute(sql_upsert, (1, 1, '1'))
cursor.execute(sql_upsert, (2, 2, json.dumps({"key": "value2"})))

# 执行 batch 写入, 一次写入 2 行数据
sql_upsert_batch = ("upsert into test_python(c1, c2, c3) values(?, ?, ?), (?, ?, ?)")
cursor.execute(sql_upsert_batch, (3, 3, '3' , 4, 4, json.dumps({"key": "value4"})))

# 删除数据
sql_delete = "delete from test_python where c1 = ?"
print(sql_delete)
cursor.execute(sql_delete, (3,))

# 修改数据
sql_update = "upsert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_update)
cursor.execute(sql_update, (1, 2, '2'))

# 查询特定数据
sql_select = "select * from test_python where c1 = ?"
print(sql_select)
cursor.execute(sql_select, (4,))
rows = cursor.fetchall()
print(rows)

# 查询表中所有数据
sql_select_all = "select * from test_python"
print(sql_select_all)
cursor.execute(sql_select_all)
rows = cursor.fetchall()
print(rows)

# 关闭 cursor
cursor.close()

# 关闭连接
connection.close()
重要

如果写入的字符串中含有特殊字符(例如JSON字符串中的双引号),建议您通过prepared=True的方式写入数据(上述示例代码所示方式),避免写入Lindorm的数据被添加特殊的转义字符。

连接池模式

示例代码如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json

import mysql.connector
from mysql.connector import pooling

# 创建连接池
# pool_name是连接池的名称
# pool_size是连接池的大小,即连接池中最多包含多少个连接,建议按照业务实际情况进行修改
# host是Lindorm宽表引擎MySQL协议的连接地址。
# port是Lindorm宽表引擎MySQL协议的端口,一般为33060。
# user是Lindorm宽表引擎的用户账号。
# passwd是Lindorm宽表引擎的用户账号对应的密码。
# database是Lindorm宽表引擎中的数据库名。
connection_pool = pooling.MySQLConnectionPool(
    pool_name="mypool",
    pool_size=20,
    host='11.166.XX.X',
    port=33060,
    user='root',
    password='root',
    database='default',
)

# 从连接池中获取连接
connection = connection_pool.get_connection()

# 创建 cursor,注意prepared=True
cursor = connection.cursor(prepared=True)

# 删除表
sql_drop_table = "drop table if exists test_python"
print(sql_drop_table)
cursor.execute(sql_drop_table)

# 创建表
sql_create_table = ("create table test_python(c1 integer, c2 integer, c3 varchar, primary key(c1))")
print(sql_create_table)
cursor.execute(sql_create_table)

# 执行单条插入语句
sql_upsert = "insert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_upsert)
cursor.execute(sql_upsert, (1, 1, '1'))
cursor.execute(sql_upsert, (2, 2, '2'))


# 执行 batch 写入,一次写入 3 行数据
sql_upsert_batch = "insert into test_python(c1, c2, c3) values(?, ?, ?), (?, ?, ?), (?, ?, ?)"
cursor.execute(sql_upsert_batch, (3, 3, '3', 4, 4, '4', 5, 5, '5'))

# 删除数据
sql_delete = "delete from test_python where c1 = ?"
print(sql_delete)
cursor.execute(sql_delete, (3,))

# 修改数据
sql_update = "upsert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_update)
cursor.execute(sql_update, (1, 2, '2'))

# 查询特定数据
sql_select = "select * from test_python where c1 = ?"
print(sql_select)
cursor.execute(sql_select, (4,))
rows = cursor.fetchall()
print(rows)

# 查询表中所有数据
sql_select_all = "select * from test_python"
print(sql_select_all)
cursor.execute(sql_select_all)
rows = cursor.fetchall()
print(rows)

# 关闭 cursor
cursor.close()

# 关闭连接,连接会返回到连接池中,而不是真正关闭
connection.close()