通过SQLAlchemy导入DataFrame数据
本文主要介绍如何使用SQLAlchemy将Python DataFrame的数据导入至AnalyticDB for MySQL。
前提条件
已安装Python环境,且Python版本为3.7及以上版本。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号、授予普通账号相应的库表权限并将RAM用户绑定到普通账号上。
已将运行Python应用程序的服务器IP地址添加至AnalyticDB for MySQL集群的白名单中。
注意事项
通过SQLAlchemy在AnalyticDB for MySQL中创建表时,需要配置checkfirst=False
参数(例如 metadata.create_all(engine,checkfirst=False)
),否则会报错。
准备工作
进入SQL开发页面。
企业版、基础版或湖仓版:
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。
在左侧导航栏,单击。
在SQLConsole窗口,选择XIHE引擎和Interactive型资源组。
数仓版:连接集群。
创建数据库。
CREATE DATABASE demo_db;
操作步骤
本示例涵盖了自动生成动态随机数据、构造静态数据、创建数据库连接、创建表结构和写入数据的过程,为您演示如何将数据导入AnalyticDB for MySQL的内表。
执行以下代码,下载Python依赖。
pip install pandas pip install pymysql
替换以下代码中的相关配置参数并执行,将DataFrame数据导入至AnalyticDB for MySQL内表。
import pandas as pd from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float, DateTime, inspect import pytz import datetime import random # 定义一个函数用于动态生成随机数据 def generate_random_book_data(num_records=4): """ Define a function to dynamically generate random book data parameter: num_records (int): The number of records to generate, default is 4. redsponse: pd.DataFrame: A Pandas DataFrame containing random book data. """ book_titles = [ "The Great Gatsby", "To Kill a Mockingbird", "1984", "Pride and Prejudice", "Moby Dick", "War and Peace", "The Catcher in the Rye", "Brave New World", "The Hobbit", "Crime and Punishment" ] timezones = ["America/New_York", "America/Chicago", "Europe/London", "Europe/Dublin", "Asia/Tokyo"] min_publication_year = 1700 max_publication_year = 2023 min_pages = 50 max_pages = 1000 current_year = datetime.datetime.now().year records = [] for i in range(num_records): book_title = random.choice(book_titles) publication_year = random.randint(min_publication_year, max_publication_year) pages = random.randint(min_pages, max_pages) timezone = random.choice(timezones) publication_date_local = datetime.datetime( publication_year, random.randint(1, 12), random.randint(1, 28), random.randint(0, 23), random.randint(0, 59), random.randint(0, 59) ) publication_date_utc = pytz.timezone(timezone).localize(publication_date_local).astimezone(pytz.utc) ebook_min_year = publication_year + 1 ebook_max_year = min(current_year + 10, publication_year + 50) if ebook_min_year > ebook_max_year: ebook_release_year = ebook_min_year else: ebook_release_year = random.randint(ebook_min_year, ebook_max_year) ebook_release = datetime.datetime( ebook_release_year, random.randint(1, 12), random.randint(1, 28), random.randint(0, 23), random.randint(0, 59), random.randint(0, 59) ) record = { "book_title": book_title, "publication_year": publication_year, "pages": pages, "publication_date": publication_date_utc, "ebook_release": ebook_release, } records.append(record) dataframe = pd.DataFrame(records) dataframe.index = [f"B{random.randint(1000, 9999)}" for _ in range(num_records)] dataframe.index.name = "book_id" return dataframe.reset_index() if __name__ == '__main__': # 配置AnalyticDB for MySQL连接信息 # AnalyticDB for MySQL数据库账号 username = "user" # AnalyticDB for MySQL数据库账号的密码 password = "password****" # AnalyticDB for MySQL的连接地址 host = "amv-t4ny9j5****.ads.aliyuncs.com" # AnalyticDB for MySQL的端口号,默认为3306 port = 3306 # AnalyticDB for MySQL的数据库名称 database = "demo_db" # AnalyticDB for MySQL的表名称,若该表不存在,则自动创建,且会在创建时定义book_id为主键 table_name = "test_dataframe_insert_table" connection_string = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}" # 创建SQLAlchemy连接 try: engine = create_engine(connection_string) print("The connection with ADB MySQL was created successfully!") except Exception as e: print(f"Failed to create connection with ADB MySQL!: {e}") exit() # 构建示例数据 records_fixed = [ { "book_title": "The Great Gatsby", "publication_year": 1925, "pages": 218, "publication_date": pytz.timezone("America/New_York") .localize(datetime.datetime(1925, 4, 10, 9, 0, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(2000, 6, 15, 10, 0, 0), }, { "book_title": "To Kill a Mockingbird", "publication_year": 1960, "pages": 281, "publication_date": pytz.timezone("America/Chicago") .localize(datetime.datetime(1960, 7, 11, 15, 30, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(2002, 8, 20, 12, 0, 0), }, { "book_title": "1984", "publication_year": 1949, "pages": 328, "publication_date": pytz.timezone("Europe/London") .localize(datetime.datetime(1949, 6, 8, 11, 0, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(1998, 5, 1, 9, 0, 0), }, { "book_title": "Pride and Prejudice", "publication_year": 1813, "pages": 432, "publication_date": pytz.timezone("Europe/Dublin") .localize(datetime.datetime(1813, 1, 28, 14, 0, 0)) .astimezone(pytz.utc), "ebook_release": datetime.datetime(2003, 12, 10, 14, 30, 0), }, ] # 转换为Pandas DataFrame dataframe_fixed = pd.DataFrame( records_fixed, columns=[ "book_title", "publication_year", "pages", "publication_date", "ebook_release", ], index=pd.Index(["B1001", "B1002", "B1003", "B1004"], name="book_id"), ) # 将索引列添加为DataFrame的列 dataframe_fixed.reset_index(inplace=True) # 定义表结构 metadata = MetaData() books_table = Table( table_name, metadata, Column("book_id", String(10), primary_key=True), Column("book_title", String(255)), Column("publication_year", Integer), Column("pages", Integer), Column("publication_date", DateTime), Column("ebook_release", DateTime) ) try: inspector = inspect(engine) existing_tables = inspector.get_table_names() if table_name in existing_tables: print(f"Table {table_name} already exists, no need to recreate!") else: metadata.create_all(engine,checkfirst=False) print(f"Table {table_name} Successfully created!") except Exception as e: print(f"Error of checking or creating table:{e}") # 写入静态数据 try: dataframe_fixed.to_sql(name=table_name, con=engine, if_exists="append", index=False) print(f"Static data is successfully written to the table {table_name}") except Exception as e: print(f"Error of writing static data::{e}") # 生成动态数据并写入表 dataframe_dynamic = generate_random_book_data(100000) try: dataframe_dynamic.to_sql(name=table_name, con=engine, if_exists="append", index=False) print(f"Dynamic data is successfully written to the table {table_name}") except Exception as e: print(f"Error writing dynamic data:{e}") finally: # 关闭连接 engine.dispose()
参数说明:
参数
说明
username
AnalyticDB for MySQL数据库账号。
password
AnalyticDB for MySQL数据库账号的密码。
host
AnalyticDB for MySQL的连接地址。
port
AnalyticDB for MySQL的端口号,默认为3306。
database
AnalyticDB for MySQL的数据库名称。
table_name
AnalyticDB for MySQL的表名称。若该表不存在,则自动创建,且会在创建时定义book_id为主键。
说明如果您想了解DataFrame数据的完整导入流程,请参见SQLAlchemy官方文档。
在AnalyticDB for MySQL中查询数据是否导入成功。
SELECT * FROM `demo_db`.`test_dataframe_insert_table` LIMIT 20;
返回结果如下:
+---------+------------------------+------------------+-------+---------------------+---------------------+ | book_id | book_title | publication_year | pages | publication_date | ebook_release | +---------+------------------------+------------------+-------+---------------------+---------------------+ | B1752 | 1984 | 1861 | 493 | 1861-11-08 00:40:32 | 1906-08-15 08:12:56 | | B5420 | To Kill a Mockingbird | 1856 | 175 | 1856-02-26 00:50:57 | 1894-02-12 22:42:06 | | B4033 | Crime and Punishment | 1886 | 675 | 1886-11-20 08:23:29 | 1912-02-14 04:32:22 | | B3597 | Moby Dick | 1719 | 779 | 1719-10-05 02:15:13 | 1734-02-21 06:53:00 | | B4708 | To Kill a Mockingbird | 1811 | 951 | 1811-05-08 06:31:02 | 1830-11-03 22:23:42 | | B9493 | The Great Gatsby | 1976 | 843 | 1976-10-23 10:57:41 | 1986-06-11 22:18:59 | | B8066 | Crime and Punishment | 1750 | 689 | 1750-07-01 12:45:52 | 1766-10-06 00:35:15 | | B6823 | 1984 | 1921 | 462 | 1921-04-12 18:48:16 | 1953-09-14 03:30:51 | | B3875 | War and Peace | 1927 | 893 | 1927-03-12 19:08:28 | 1950-12-14 06:05:49 | | B3528 | The Hobbit | 1713 | 774 | 1713-11-05 13:59:30 | 1734-08-12 13:58:10 | | B7133 | Moby Dick | 1835 | 347 | 1835-07-02 15:03:11 | 1838-03-27 22:29:10 | | B1140 | War and Peace | 1837 | 496 | 1837-11-27 00:27:22 | 1881-08-10 13:37:35 | | B3244 | The Catcher in the Rye | 1842 | 739 | 1842-08-02 13:17:45 | 1876-10-07 07:35:05 | | B4300 | Brave New World | 1764 | 654 | 1764-06-26 08:56:17 | 1775-03-15 20:47:09 | | B4958 | The Catcher in the Rye | 1867 | 321 | 1867-12-21 16:55:14 | 1907-09-15 14:54:07 | | B6145 | Brave New World | 1844 | 872 | 1844-07-17 20:51:49 | 1866-08-06 14:17:23 | | B5163 | The Catcher in the Rye | 1798 | 952 | 1798-05-16 02:48:41 | 1813-09-12 15:07:39 | | B1310 | 1984 | 1975 | 588 | 1975-02-27 07:03:15 | 1985-03-09 23:27:13 | | B1857 | Crime and Punishment | 1990 | 59 | 1990-02-28 07:02:26 | 2015-10-19 08:21:48 | | B3155 | To Kill a Mockingbird | 1928 | 236 | 1928-05-17 13:04:56 | 1958-11-01 16:19:38 | +---------+------------------------+------------------+-------+---------------------+---------------------+