SMTP多线程发信示例之Python

重要

风险提示:"用于生产环境之前请务必先做好测试。"

Python中的多线程是一种并发编程模型,它允许多个线程(即轻量级进程)在一个程序中同时执行。多线程可以提高程序的响应性和效率,特别是在处理I/O操作、网络请求或者其他耗时操作时。

Python中的多线程库

Python提供了几种多线程的支持库:

1. threading模块:这是Python中最常用的多线程库,提供了创建和管理线程的基本功能。

2. concurrent.futures模块:提供了更高层次的抽象,使得编写并发代码更简单。

使用threading模块创建线程

下面是一个简单的示例,展示如何使用threading模块创建和管理线程:

import threading
import time


def worker(num):
    """线程函数"""
    print(f'Thread {num}: starting')
    time.sleep(2)
    print(f'Thread {num}: finishing')


def main():
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()

    # 等待所有线程完成
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

使用concurrent.futures简化多线程编程

concurrent.futures模块提供了一个更高级别的接口来执行异步任务:

from concurrent.futures import ThreadPoolExecutor
import time


def worker(num):
    print(f'Thread {num}: starting')
    time.sleep(2)
    print(f'Thread {num}: finishing')
    return num


def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(worker, i) for i in range(5)]
        for future in futures:
            print(f'Result: {future.result()}')


if __name__ == '__main__':
    main()

发送邮件

单线程发送示例

SMTP 之 Python3.6 及以上调用示例

多线程发送示例

下述代码在Python 3.11.9进行的测试。

服务器地址请根据控制台配置的对应区域选择,请参考SMTP 服务地址

# -*- coding: utf-8 -*-
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from email.utils import make_msgid, formatdate


class SMTPConnectionPool:
    def __init__(self, max_connections, smtp_server, smtp_port, username, password):
        self.pool = queue.Queue(max_connections)
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password

        # 初始化连接池
        for _ in range(max_connections):
            connection = self.create_connection()
            self.pool.put(connection)

    def create_connection(self):
        conn = smtplib.SMTP_SSL(smtp_server, smtp_port)
        conn.login(username, password)
        return conn

    def get_connection(self):
        return self.pool.get()

    def return_connection(self, conn):
        self.pool.put(conn)

    def close_all_connections(self):
        while not self.pool.empty():
            conn = self.pool.get()
            conn.quit()


def send_email(pool, subject, body, to_email, retry_enabled, retry_count=0):
    """
    发送邮件函数,支持重试机制。

    参数:
    subject (str): 邮件主题。
    body (str): 邮件正文。
    to_email (list): 收件人邮箱列表。
    retry_enabled (bool): 是否启用重试机制。
    retry_count (int): 当前重试次数,默认为0。

    返回:
    bool:邮件发送成功或不再重试时返回True,需要重试时返回False。
    """
    conn = pool.get_connection()
    # 获取当前线程名称
    thread_name = threading.current_thread().name
    # 记录开始时间
    threading_start_time = time.time()
    v_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    # 根据重试次数打印相应信息
    if retry_count == 0:
        print(f'{thread_name}================================={v_time} 首次发送!\n')
    else:
        print(f"{thread_name}================================={v_time} 开始第{retry_count}次重试!")

    # 发送邮件逻辑
    try:
        # 构建邮件内容和发送邮件的逻辑
        msg = MIMEMultipart('alternative')
        msg['Subject'] = Header(subject)
        msg['From'] = username
        msg['To'] = ",".join(to_email)
        msg['Message-id'] = make_msgid()
        msg['Date'] = formatdate()

        text_html = MIMEText(body, _subtype='html', _charset='UTF-8')
        msg.attach(text_html)

        # 开启DEBUG模式
        conn.set_debuglevel(1)
        conn.sendmail(username, to_email, msg.as_string())
        # client.quit()
        print(f'{thread_name}--发送成功!\n')
    except Exception as e:
        # 处理发送邮件时的异常
        print(f'{thread_name}--------------------------------Exception\n')
        print(f"{thread_name}--Exception SMTP server reply: {e}\n")

        # 判断是否需要重试的逻辑
        if retry_enabled is False:
            print(f"{thread_name}--不符合重试条件(重试开关关闭)!\n")
            return True
        if retry_count > retry_limit:
            print(f"{thread_name}--不符合重试条件(重试次数超过上限)!\n")
            return True

        for v_err in ruler_not_retry:
            if v_err in str(e):
                print(f"{thread_name}--不符合重试条件(包含不可重试类型)!\n")
                return True
        print(f"{thread_name}--加入重试队列,等待下次重试!\n")
        return False

    finally:
        # 记录结束时间并计算耗时
        threading_end_time = time.time()
        threading_elapsed_time = round(threading_end_time - threading_start_time, 2)
        print(f"Thread {thread_name} has finished sending email to {to_email},{threading_elapsed_time} seconds\n")
        print(f'{thread_name}--------------------------------end \n')
        # 将连接放回连接池
        pool.return_connection(conn)

    return True


def send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled):
    """
    均匀发送邮件函数。

    根据总邮件数、收件人列表、主题、正文、每秒查询率(QPS)、最大工作线程数和是否启用重试来发送邮件。
    该函数通过控制邮件发送的间隔和并行处理来实现邮件的均匀发送。

    参数:
    total_emails (int): 总邮件数。
    to_email_list (list): 收件人邮箱列表。
    subject (str): 邮件主题。
    body (str): 邮件正文。
    qps (float): 每秒查询率,用于控制发送频率。
    max_workers (int): 最大工作线程数。
    retry_enabled (bool): 是否启用重试机制。
    """
    # 计算邮件发送间隔,使发信更均匀
    if total_emails == 0:
        print("No emails to send.")
        return
    # 记录开始时间
    start_time = time.time()
    interval = round(1 / qps, 2)
    retry_queue = []  # 重试队列

    print(f'max_workers:{max_workers}')
    print(f'retry_enabled:{retry_enabled}')
    print(f'total_emails:{total_emails}')
    print(f'qps:{qps}')
    pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=465,
                              username=username, password=password)
    # 使用线程池来并行处理邮件发送任务
    with ThreadPoolExecutor(max_workers=max_workers) as executor:  # 可以根据需要调整最大线程数
        futures = []
        batch_map = {}  # 用于存储 Future 和对应的批次

        # 计算总批次数
        num_batches = (total_emails + email_batch_size - 1) // email_batch_size
        print(f'num_batches:{num_batches}')
        print(f'email_batch_size:{email_batch_size}')

        for batch_idx in range(num_batches):
            start_index = batch_idx * email_batch_size
            end_index = min(start_index + email_batch_size, total_emails)
            current_batch = to_email_list[start_index:end_index]

            if current_batch:
                future = executor.submit(send_email, pool, subject, body, current_batch, retry_enabled, 0)
                futures.append(future)  # 储存未来对象
                batch_map[future] = current_batch  # 储存未来对象和对应的批次

                # 控制发送频率
                time.sleep(interval)
                # print(f'等待 {interval} 秒')

        # 等待所有任务完成
        for future in as_completed(futures):
            batch = batch_map[future]
            if not future.result():  # 如果发送时返回False
                retry_queue.append((batch, 1))  # 添加到重试队列,并记录重试次数

        # 重试逻辑
        while retry_queue and retry_enabled:
            new_retry_queue = []
            for batch, retry_count in retry_queue:
                future = executor.submit(send_email, pool, subject, body, batch, retry_enabled, retry_count)
                if not future.result():
                    new_retry_count = retry_count + 1
                    if new_retry_count < (retry_limit + 1):
                        new_retry_queue.append((batch, new_retry_count))
                    else:
                        print(f"Maximum retry attempts have been reached, give up retry: {batch}")
            print(f"Retry queue after this round: {new_retry_queue}")
            retry_queue = new_retry_queue

    # 记录结束时间
    end_time = time.time()
    # 计算耗时
    elapsed_time = round(end_time - start_time, 2)
    print(f"Total time taken: {elapsed_time} seconds")


retry_enabled = False  # True/False, True开启重试, 大多数情况不建议重试,重试是整组重试(如共120人,分2次请求,单次请求60人,重试会按60人重试)。
retry_limit = 2  # 最多重试次数,开启重试后生效
# 截取一些关键特征,含有该特征的不进行重试
ruler_not_retry = ["too frequency", 'getaddrinfo failed', 'Authentication failure', 'Invalid rcptto']

# 示例参数
qps = 3  # 自定义的QPS,1表示每秒请求1次,不要超过27,约5000次/180秒,预热发信不宜太高,避免收信方拒绝
max_workers = 200  # 最大线程数,用于保护服务器,避免过度占用系统资源
email_batch_size = 1  # 每次最多处理的收件人数量60,设置多人会都显示在邮件上

smtp_server = 'smtpdm.aliyun.com'
username = 'test@t1.example.com'  # username,通过控制台创建的发信地址
password = 'xxxxxxxxx'  # password,通过控制台创建的SMTP密码
smtp_port = 465
subject = '自定义主题'
body = '<h1>自定义内容</h1>'
pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=smtp_port,
                          username=username, password=password)
# 模拟数据
total_emails = 10  # 需要发送的总邮件数,真实发信需要替换为真实发件人数量,如len(to_email_list)
to_email_list = [f'recipient{i}@example.com' for i in range(0, total_emails)]  # 生成1000个示例收件人,真实发信需要传入真实收件人列表

# 真实发信
# to_email_list = ['a@example.com', 'b@example.com', 'c@example.com']  # 传入真实收件人列表
# total_emails = len(to_email_list)  # 需要发送的总邮件数,真实发信需要替换为真实发件人数量,如len(to_email_list)

# 调用发送
send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled)

# 关闭所有连接
pool.close_all_connections()

send_email 函数

功能:发送邮件,并支持重试机制。

参数

pool:SMTP连接池。

subject:邮件主题。

body:邮件正文。

to_email:收件人邮箱列表。

retry_enabled:是否启用重试机制。

retry_count:当前重试次数,默认为0。

返回值:邮件发送成功或不再重试时返回True,需要重试时返回False。

send_mail_with_fixed_rate 函数

功能:均匀发送邮件,根据总邮件数、收件人列表、主题、正文、每秒查询率(QPS)、最大工作线程数和是否启用重试来发送邮件。

参数

total_emails:总邮件数。

to_email_list:收件人邮箱列表。

subject:邮件主题。

body:邮件正文。

qps:每秒查询率,用于控制发送频率。

max_workers:最大工作线程数。

retry_enabled:是否启用重试机制。

image

最佳实践

retry_enabled重试机制默认为False关闭,按需设置QPS默认为 3,建议不要超过27,约5000次/180秒,预热发信QPS不宜太高,避免收信方拒绝,email_batch_size单次请求人数默认为1,设置多人会都显示在同一封邮件上。

代码综述

该段代码主要用于批量发送电子邮件,并实现了邮件发送的重试机制。主要功能包括:

邮件发送:通过 send_email 函数实现邮件的构建与发送,支持重试机制。

邮件均匀发送:通过 send_mail_with_fixed_rate 函数实现邮件的均匀发送,控制发送频率和并发度,避免对邮件服务器造成过大压力。

重试机制:当邮件发送失败时,可以根据配置决定是否进行重试,以及重试的最大次数。

三种不重试的情况:

  • 不符合重试条件(重试开关关闭)!

  • 不符合重试条件(重试次数超过上限)!

  • 不符合重试条件(包含不可重试类型)!

日志定位:线程名称{thread_name}收件人地址{to_email}用于匹配单次请求的日志。

注意事项

SMTP服务器配置:确保服务器地址,账号,密码正确无误。

收件人列表格式:to_email_list 应为一个列表,每个元素为一个收件人的邮箱地址。如果直接使用逗号分隔的字符串,可能会导致邮件发送失败。

并发控制:max_workers 参数控制了并发线程的数量,应根据服务器性能和SMTP服务器限制合理设置。

发送频率控制:QPS参数用于控制每秒发送的邮件数量,过高的值可能导致SMTP服务器拒绝服务。

重试机制:retry_enabled 和 retry_limit 参数决定了是否启用重试及最大重试次数,需谨慎设置,避免不必要的重复发送。

常见问题

邮件发送失败:

检查SMTP服务器配置是否正确。

确认收件人列表格式是否正确。

检查网络连接是否正常。

查看SMTP服务器的日志,确认是否有具体的错误信息。

邮件发送速度过慢:

调整QPS参数,适当增加每秒发送的邮件数量。

检查 max_workers 参数,确保并发度足够高。

重试机制失效:

确认 retry_enabled 参数是否为 True。

检查 retry_limit 参数是否设置合理。

确认 ruler_not_retry 列表中的错误信息是否匹配实际的异常情况。

日志输出过多:

调整日志输出级别,减少不必要的调试信息。

可以将日志输出到文件,而不是控制台,以便于查看和管理。

代码优化建议:

日志记录:可以使用 Python 的 logging 模块替代 print 语句,以便更好地管理和控制日志输出。

配置管理:将配置参数(如 smtp_server, username, password 等)提取到配置文件中,便于管理和维护。

异常处理:增加更多的异常捕获和处理逻辑,提高程序的健壮性和容错能力。

性能优化:考虑使用异步编程模型(如 asyncio)进一步提升邮件发送的效率。