通过SQLAlchemy导入DataFrame数据

更新时间:

本文主要介绍如何使用SQLAlchemyPython DataFrame的数据导入至AnalyticDB for MySQL

前提条件

注意事项

通过SQLAlchemyAnalyticDB for MySQL中创建表时,需要配置checkfirst=False参数(例如 metadata.create_all(engine,checkfirst=False)),否则会报错。

准备工作

  1. 进入SQL开发页面。

    • 企业版、基础版或湖仓版

      1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

      2. 在左侧导航栏,单击作业开发 > SQL开发

      3. SQLConsole窗口,选择XIHE引擎和Interactive型资源组。

    • 数仓版连接集群

  2. 创建数据库。

    CREATE DATABASE demo_db;

操作步骤

本示例涵盖了自动生成动态随机数据、构造静态数据、创建数据库连接、创建表结构和写入数据的过程,为您演示如何将数据导入AnalyticDB for MySQL的内表。

  1. 执行以下代码,下载Python依赖。

    pip install pandas
    pip install pymysql
  2. 替换以下代码中的相关配置参数并执行,将DataFrame数据导入至AnalyticDB for MySQL内表

    import pandas as pd
    from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float, DateTime, inspect
    import pytz
    import datetime
    import random
    
    
    # 定义一个函数用于动态生成随机数据
    def generate_random_book_data(num_records=4):
        """
        Define a function to dynamically generate random book data
        parameter:
            num_records (int): The number of records to generate, default is 4.
        redsponse:
            pd.DataFrame: A Pandas DataFrame containing random book data.
        """
        book_titles = [
            "The Great Gatsby",
            "To Kill a Mockingbird",
            "1984",
            "Pride and Prejudice",
            "Moby Dick",
            "War and Peace",
            "The Catcher in the Rye",
            "Brave New World",
            "The Hobbit",
            "Crime and Punishment"
        ]
        timezones = ["America/New_York", "America/Chicago", "Europe/London", "Europe/Dublin", "Asia/Tokyo"]
        min_publication_year = 1700
        max_publication_year = 2023
        min_pages = 50
        max_pages = 1000
        current_year = datetime.datetime.now().year
    
        records = []
        for i in range(num_records):
            book_title = random.choice(book_titles)
            publication_year = random.randint(min_publication_year, max_publication_year)
            pages = random.randint(min_pages, max_pages)
    
            timezone = random.choice(timezones)
            publication_date_local = datetime.datetime(
                publication_year,
                random.randint(1, 12),
                random.randint(1, 28),
                random.randint(0, 23),
                random.randint(0, 59),
                random.randint(0, 59)
            )
            publication_date_utc = pytz.timezone(timezone).localize(publication_date_local).astimezone(pytz.utc)
    
            ebook_min_year = publication_year + 1
            ebook_max_year = min(current_year + 10, publication_year + 50)
            if ebook_min_year > ebook_max_year:
                ebook_release_year = ebook_min_year
            else:
                ebook_release_year = random.randint(ebook_min_year, ebook_max_year)
    
            ebook_release = datetime.datetime(
                ebook_release_year,
                random.randint(1, 12),
                random.randint(1, 28),
                random.randint(0, 23),
                random.randint(0, 59),
                random.randint(0, 59)
            )
            record = {
                "book_title": book_title,
                "publication_year": publication_year,
                "pages": pages,
                "publication_date": publication_date_utc,
                "ebook_release": ebook_release,
            }
            records.append(record)
    
        dataframe = pd.DataFrame(records)
        dataframe.index = [f"B{random.randint(1000, 9999)}" for _ in range(num_records)]
        dataframe.index.name = "book_id"
        return dataframe.reset_index()
    
    if __name__ == '__main__':
    
        # 配置AnalyticDB for MySQL连接信息
        # AnalyticDB for MySQL数据库账号
        username = "user"
        # AnalyticDB for MySQL数据库账号的密码
        password = "password****"
        # AnalyticDB for MySQL的连接地址
        host = "amv-t4ny9j5****.ads.aliyuncs.com"
        # AnalyticDB for MySQL的端口号,默认为3306
        port = 3306
        # AnalyticDB for MySQL的数据库名称
        database = "demo_db"
        # AnalyticDB for MySQL的表名称,若该表不存在,则自动创建,且会在创建时定义book_id为主键
        table_name = "test_dataframe_insert_table"
        connection_string = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}"
    
        # 创建SQLAlchemy连接
        try:
            engine = create_engine(connection_string)
            print("The connection with ADB MySQL was created successfully!")
        except Exception as e:
            print(f"Failed to create connection with ADB MySQL!: {e}")
            exit()
    
        # 构建示例数据
        records_fixed = [
            {
                "book_title": "The Great Gatsby",
                "publication_year": 1925,
                "pages": 218,
                "publication_date": pytz.timezone("America/New_York")
                .localize(datetime.datetime(1925, 4, 10, 9, 0, 0))
                .astimezone(pytz.utc),
                "ebook_release": datetime.datetime(2000, 6, 15, 10, 0, 0),
            },
            {
                "book_title": "To Kill a Mockingbird",
                "publication_year": 1960,
                "pages": 281,
                "publication_date": pytz.timezone("America/Chicago")
                .localize(datetime.datetime(1960, 7, 11, 15, 30, 0))
                .astimezone(pytz.utc),
                "ebook_release": datetime.datetime(2002, 8, 20, 12, 0, 0),
            },
            {
                "book_title": "1984",
                "publication_year": 1949,
                "pages": 328,
                "publication_date": pytz.timezone("Europe/London")
                .localize(datetime.datetime(1949, 6, 8, 11, 0, 0))
                .astimezone(pytz.utc),
                "ebook_release": datetime.datetime(1998, 5, 1, 9, 0, 0),
            },
            {
                "book_title": "Pride and Prejudice",
                "publication_year": 1813,
                "pages": 432,
                "publication_date": pytz.timezone("Europe/Dublin")
                .localize(datetime.datetime(1813, 1, 28, 14, 0, 0))
                .astimezone(pytz.utc),
                "ebook_release": datetime.datetime(2003, 12, 10, 14, 30, 0),
            },
        ]
        # 转换为Pandas DataFrame
        dataframe_fixed = pd.DataFrame(
            records_fixed,
            columns=[
                "book_title",
                "publication_year",
                "pages",
                "publication_date",
                "ebook_release",
            ],
            index=pd.Index(["B1001", "B1002", "B1003", "B1004"], name="book_id"),
        )
    
        # 将索引列添加为DataFrame的列
        dataframe_fixed.reset_index(inplace=True)
        # 定义表结构
        metadata = MetaData()
        books_table = Table(
            table_name,
            metadata,
            Column("book_id", String(10), primary_key=True),
            Column("book_title", String(255)),
            Column("publication_year", Integer),
            Column("pages", Integer),
            Column("publication_date", DateTime),
            Column("ebook_release", DateTime)
        )
    
        try:
            inspector = inspect(engine)
            existing_tables = inspector.get_table_names()
            if table_name in existing_tables:
                print(f"Table {table_name} already exists, no need to recreate!")
            else:
                metadata.create_all(engine,checkfirst=False)
                print(f"Table {table_name} Successfully created!")
        except Exception as e:
            print(f"Error of checking or creating table:{e}")
    
        # 写入静态数据
        try:
            dataframe_fixed.to_sql(name=table_name, con=engine, if_exists="append", index=False)
            print(f"Static data is successfully written to the table {table_name}")
        except Exception as e:
            print(f"Error of writing static data::{e}")
    
        # 生成动态数据并写入表
        dataframe_dynamic = generate_random_book_data(100000)
        try:
            dataframe_dynamic.to_sql(name=table_name, con=engine, if_exists="append", index=False)
            print(f"Dynamic data is successfully written to the table {table_name}")
        except Exception as e:
            print(f"Error writing dynamic data:{e}")
    
        finally:
            # 关闭连接
            engine.dispose()
    

    参数说明:

    参数

    说明

    username

    AnalyticDB for MySQL数据库账号。

    password

    AnalyticDB for MySQL数据库账号的密码。

    host

    AnalyticDB for MySQL的连接地址。

    port

    AnalyticDB for MySQL的端口号,默认为3306。

    database

    AnalyticDB for MySQL的数据库名称。

    table_name

    AnalyticDB for MySQL的表名称。若该表不存在,则自动创建,且会在创建时定义book_id为主键。

    说明

    如果您想了解DataFrame数据的完整导入流程,请参见SQLAlchemy官方文档

  3. AnalyticDB for MySQL中查询数据是否导入成功。

    SELECT * FROM `demo_db`.`test_dataframe_insert_table` LIMIT 20;

    返回结果如下:

    +---------+------------------------+------------------+-------+---------------------+---------------------+
    | book_id | book_title             | publication_year | pages | publication_date    | ebook_release       |
    +---------+------------------------+------------------+-------+---------------------+---------------------+
    | B1752   | 1984                   |             1861 |   493 | 1861-11-08 00:40:32 | 1906-08-15 08:12:56 |
    | B5420   | To Kill a Mockingbird  |             1856 |   175 | 1856-02-26 00:50:57 | 1894-02-12 22:42:06 |
    | B4033   | Crime and Punishment   |             1886 |   675 | 1886-11-20 08:23:29 | 1912-02-14 04:32:22 |
    | B3597   | Moby Dick              |             1719 |   779 | 1719-10-05 02:15:13 | 1734-02-21 06:53:00 |
    | B4708   | To Kill a Mockingbird  |             1811 |   951 | 1811-05-08 06:31:02 | 1830-11-03 22:23:42 |
    | B9493   | The Great Gatsby       |             1976 |   843 | 1976-10-23 10:57:41 | 1986-06-11 22:18:59 |
    | B8066   | Crime and Punishment   |             1750 |   689 | 1750-07-01 12:45:52 | 1766-10-06 00:35:15 |
    | B6823   | 1984                   |             1921 |   462 | 1921-04-12 18:48:16 | 1953-09-14 03:30:51 |
    | B3875   | War and Peace          |             1927 |   893 | 1927-03-12 19:08:28 | 1950-12-14 06:05:49 |
    | B3528   | The Hobbit             |             1713 |   774 | 1713-11-05 13:59:30 | 1734-08-12 13:58:10 |
    | B7133   | Moby Dick              |             1835 |   347 | 1835-07-02 15:03:11 | 1838-03-27 22:29:10 |
    | B1140   | War and Peace          |             1837 |   496 | 1837-11-27 00:27:22 | 1881-08-10 13:37:35 |
    | B3244   | The Catcher in the Rye |             1842 |   739 | 1842-08-02 13:17:45 | 1876-10-07 07:35:05 |
    | B4300   | Brave New World        |             1764 |   654 | 1764-06-26 08:56:17 | 1775-03-15 20:47:09 |
    | B4958   | The Catcher in the Rye |             1867 |   321 | 1867-12-21 16:55:14 | 1907-09-15 14:54:07 |
    | B6145   | Brave New World        |             1844 |   872 | 1844-07-17 20:51:49 | 1866-08-06 14:17:23 |
    | B5163   | The Catcher in the Rye |             1798 |   952 | 1798-05-16 02:48:41 | 1813-09-12 15:07:39 |
    | B1310   | 1984                   |             1975 |   588 | 1975-02-27 07:03:15 | 1985-03-09 23:27:13 |
    | B1857   | Crime and Punishment   |             1990 |    59 | 1990-02-28 07:02:26 | 2015-10-19 08:21:48 |
    | B3155   | To Kill a Mockingbird  |             1928 |   236 | 1928-05-17 13:04:56 | 1958-11-01 16:19:38 |
    +---------+------------------------+------------------+-------+---------------------+---------------------+