风险提示:"用于生产环境之前请务必先做好测试。"
Python中的多线程是一种并发编程模型,它允许多个线程(即轻量级进程)在一个程序中同时执行。多线程可以提高程序的响应性和效率,特别是在处理I/O操作、网络请求或者其他耗时操作时。
Python中的多线程库
Python提供了几种多线程的支持库:
1. threading模块:这是Python中最常用的多线程库,提供了创建和管理线程的基本功能。
2. concurrent.futures模块:提供了更高层次的抽象,使得编写并发代码更简单。
使用threading模块创建线程
下面是一个简单的示例,展示如何使用threading模块创建和管理线程:
import threading
import time
def worker(num):
"""线程函数"""
print(f'Thread {num}: starting')
time.sleep(2)
print(f'Thread {num}: finishing')
def main():
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
if __name__ == '__main__':
main()
使用concurrent.futures简化多线程编程
concurrent.futures模块提供了一个更高级别的接口来执行异步任务:
from concurrent.futures import ThreadPoolExecutor
import time
def worker(num):
print(f'Thread {num}: starting')
time.sleep(2)
print(f'Thread {num}: finishing')
return num
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
for future in futures:
print(f'Result: {future.result()}')
if __name__ == '__main__':
main()
发送邮件
单线程发送示例
多线程发送示例
下述代码在Python 3.11.9进行的测试。
服务器地址请根据控制台配置的对应区域选择,请参考SMTP 服务地址。
# -*- coding: utf-8 -*-
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from email.utils import make_msgid, formatdate
class SMTPConnectionPool:
def __init__(self, max_connections, smtp_server, smtp_port, username, password):
self.pool = queue.Queue(max_connections)
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
# 初始化连接池
for _ in range(max_connections):
connection = self.create_connection()
self.pool.put(connection)
def create_connection(self):
conn = smtplib.SMTP_SSL(smtp_server, smtp_port)
conn.login(username, password)
return conn
def get_connection(self):
return self.pool.get()
def return_connection(self, conn):
self.pool.put(conn)
def close_all_connections(self):
while not self.pool.empty():
conn = self.pool.get()
conn.quit()
def send_email(pool, subject, body, to_email, retry_enabled, retry_count=0):
"""
发送邮件函数,支持重试机制。
参数:
subject (str): 邮件主题。
body (str): 邮件正文。
to_email (list): 收件人邮箱列表。
retry_enabled (bool): 是否启用重试机制。
retry_count (int): 当前重试次数,默认为0。
返回:
bool:邮件发送成功或不再重试时返回True,需要重试时返回False。
"""
conn = pool.get_connection()
# 获取当前线程名称
thread_name = threading.current_thread().name
# 记录开始时间
threading_start_time = time.time()
v_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 根据重试次数打印相应信息
if retry_count == 0:
print(f'{thread_name}================================={v_time} 首次发送!\n')
else:
print(f"{thread_name}================================={v_time} 开始第{retry_count}次重试!")
# 发送邮件逻辑
try:
# 构建邮件内容和发送邮件的逻辑
msg = MIMEMultipart('alternative')
msg['Subject'] = Header(subject)
msg['From'] = username
msg['To'] = ",".join(to_email)
msg['Message-id'] = make_msgid()
msg['Date'] = formatdate()
text_html = MIMEText(body, _subtype='html', _charset='UTF-8')
msg.attach(text_html)
# 开启DEBUG模式
conn.set_debuglevel(1)
conn.sendmail(username, to_email, msg.as_string())
# client.quit()
print(f'{thread_name}--发送成功!\n')
except Exception as e:
# 处理发送邮件时的异常
print(f'{thread_name}--------------------------------Exception\n')
print(f"{thread_name}--Exception SMTP server reply: {e}\n")
# 判断是否需要重试的逻辑
if retry_enabled is False:
print(f"{thread_name}--不符合重试条件(重试开关关闭)!\n")
return True
if retry_count > retry_limit:
print(f"{thread_name}--不符合重试条件(重试次数超过上限)!\n")
return True
for v_err in ruler_not_retry:
if v_err in str(e):
print(f"{thread_name}--不符合重试条件(包含不可重试类型)!\n")
return True
print(f"{thread_name}--加入重试队列,等待下次重试!\n")
return False
finally:
# 记录结束时间并计算耗时
threading_end_time = time.time()
threading_elapsed_time = round(threading_end_time - threading_start_time, 2)
print(f"Thread {thread_name} has finished sending email to {to_email},{threading_elapsed_time} seconds\n")
print(f'{thread_name}--------------------------------end \n')
# 将连接放回连接池
pool.return_connection(conn)
return True
def send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled):
"""
均匀发送邮件函数。
根据总邮件数、收件人列表、主题、正文、每秒查询率(QPS)、最大工作线程数和是否启用重试来发送邮件。
该函数通过控制邮件发送的间隔和并行处理来实现邮件的均匀发送。
参数:
total_emails (int): 总邮件数。
to_email_list (list): 收件人邮箱列表。
subject (str): 邮件主题。
body (str): 邮件正文。
qps (float): 每秒查询率,用于控制发送频率。
max_workers (int): 最大工作线程数。
retry_enabled (bool): 是否启用重试机制。
"""
# 计算邮件发送间隔,使发信更均匀
if total_emails == 0:
print("No emails to send.")
return
# 记录开始时间
start_time = time.time()
interval = round(1 / qps, 2)
retry_queue = [] # 重试队列
print(f'max_workers:{max_workers}')
print(f'retry_enabled:{retry_enabled}')
print(f'total_emails:{total_emails}')
print(f'qps:{qps}')
pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=465,
username=username, password=password)
# 使用线程池来并行处理邮件发送任务
with ThreadPoolExecutor(max_workers=max_workers) as executor: # 可以根据需要调整最大线程数
futures = []
batch_map = {} # 用于存储 Future 和对应的批次
# 计算总批次数
num_batches = (total_emails + email_batch_size - 1) // email_batch_size
print(f'num_batches:{num_batches}')
print(f'email_batch_size:{email_batch_size}')
for batch_idx in range(num_batches):
start_index = batch_idx * email_batch_size
end_index = min(start_index + email_batch_size, total_emails)
current_batch = to_email_list[start_index:end_index]
if current_batch:
future = executor.submit(send_email, pool, subject, body, current_batch, retry_enabled, 0)
futures.append(future) # 储存未来对象
batch_map[future] = current_batch # 储存未来对象和对应的批次
# 控制发送频率
time.sleep(interval)
# print(f'等待 {interval} 秒')
# 等待所有任务完成
for future in as_completed(futures):
batch = batch_map[future]
if not future.result(): # 如果发送时返回False
retry_queue.append((batch, 1)) # 添加到重试队列,并记录重试次数
# 重试逻辑
while retry_queue and retry_enabled:
new_retry_queue = []
for batch, retry_count in retry_queue:
future = executor.submit(send_email, pool, subject, body, batch, retry_enabled, retry_count)
if not future.result():
new_retry_count = retry_count + 1
if new_retry_count < (retry_limit + 1):
new_retry_queue.append((batch, new_retry_count))
else:
print(f"Maximum retry attempts have been reached, give up retry: {batch}")
print(f"Retry queue after this round: {new_retry_queue}")
retry_queue = new_retry_queue
# 记录结束时间
end_time = time.time()
# 计算耗时
elapsed_time = round(end_time - start_time, 2)
print(f"Total time taken: {elapsed_time} seconds")
retry_enabled = False # True/False, True开启重试, 大多数情况不建议重试,重试是整组重试(如共120人,分2次请求,单次请求60人,重试会按60人重试)。
retry_limit = 2 # 最多重试次数,开启重试后生效
# 截取一些关键特征,含有该特征的不进行重试
ruler_not_retry = ["too frequency", 'getaddrinfo failed', 'Authentication failure', 'Invalid rcptto']
# 示例参数
qps = 3 # 自定义的QPS,1表示每秒请求1次,不要超过27,约5000次/180秒,预热发信不宜太高,避免收信方拒绝
max_workers = 200 # 最大线程数,用于保护服务器,避免过度占用系统资源
email_batch_size = 1 # 每次最多处理的收件人数量60,设置多人会都显示在邮件上
smtp_server = 'smtpdm.aliyun.com'
username = 'test@t1.example.com' # username,通过控制台创建的发信地址
password = 'xxxxxxxxx' # password,通过控制台创建的SMTP密码
smtp_port = 465
subject = '自定义主题'
body = '<h1>自定义内容</h1>'
pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=smtp_port,
username=username, password=password)
# 模拟数据
total_emails = 10 # 需要发送的总邮件数,真实发信需要替换为真实发件人数量,如len(to_email_list)
to_email_list = [f'recipient{i}@example.com' for i in range(0, total_emails)] # 生成1000个示例收件人,真实发信需要传入真实收件人列表
# 真实发信
# to_email_list = ['a@example.com', 'b@example.com', 'c@example.com'] # 传入真实收件人列表
# total_emails = len(to_email_list) # 需要发送的总邮件数,真实发信需要替换为真实发件人数量,如len(to_email_list)
# 调用发送
send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled)
# 关闭所有连接
pool.close_all_connections()
send_email 函数
功能:发送邮件,并支持重试机制。
参数:
pool:SMTP连接池。
subject:邮件主题。
body:邮件正文。
to_email:收件人邮箱列表。
retry_enabled:是否启用重试机制。
retry_count:当前重试次数,默认为0。
返回值:邮件发送成功或不再重试时返回True,需要重试时返回False。
send_mail_with_fixed_rate 函数
功能:均匀发送邮件,根据总邮件数、收件人列表、主题、正文、每秒查询率(QPS)、最大工作线程数和是否启用重试来发送邮件。
参数:
total_emails:总邮件数。
to_email_list:收件人邮箱列表。
subject:邮件主题。
body:邮件正文。
qps:每秒查询率,用于控制发送频率。
max_workers:最大工作线程数。
retry_enabled:是否启用重试机制。
最佳实践
retry_enabled重试机制默认为False关闭,按需设置QPS默认为 3,建议不要超过27,约5000次/180秒,预热发信QPS不宜太高,避免收信方拒绝,email_batch_size单次请求人数默认为1,设置多人会都显示在同一封邮件上。
代码综述
该段代码主要用于批量发送电子邮件,并实现了邮件发送的重试机制。主要功能包括:
邮件发送:通过 send_email 函数实现邮件的构建与发送,支持重试机制。
邮件均匀发送:通过 send_mail_with_fixed_rate 函数实现邮件的均匀发送,控制发送频率和并发度,避免对邮件服务器造成过大压力。
重试机制:当邮件发送失败时,可以根据配置决定是否进行重试,以及重试的最大次数。
三种不重试的情况:
不符合重试条件(重试开关关闭)!
不符合重试条件(重试次数超过上限)!
不符合重试条件(包含不可重试类型)!
日志定位:线程名称{thread_name}收件人地址{to_email}用于匹配单次请求的日志。
注意事项
SMTP服务器配置:确保服务器地址,账号,密码正确无误。
收件人列表格式:to_email_list 应为一个列表,每个元素为一个收件人的邮箱地址。如果直接使用逗号分隔的字符串,可能会导致邮件发送失败。
并发控制:max_workers 参数控制了并发线程的数量,应根据服务器性能和SMTP服务器限制合理设置。
发送频率控制:QPS参数用于控制每秒发送的邮件数量,过高的值可能导致SMTP服务器拒绝服务。
重试机制:retry_enabled 和 retry_limit 参数决定了是否启用重试及最大重试次数,需谨慎设置,避免不必要的重复发送。
常见问题
邮件发送失败:
检查SMTP服务器配置是否正确。
确认收件人列表格式是否正确。
检查网络连接是否正常。
查看SMTP服务器的日志,确认是否有具体的错误信息。
邮件发送速度过慢:
调整QPS参数,适当增加每秒发送的邮件数量。
检查 max_workers 参数,确保并发度足够高。
重试机制失效:
确认 retry_enabled 参数是否为 True。
检查 retry_limit 参数是否设置合理。
确认 ruler_not_retry 列表中的错误信息是否匹配实际的异常情况。
日志输出过多:
调整日志输出级别,减少不必要的调试信息。
可以将日志输出到文件,而不是控制台,以便于查看和管理。
代码优化建议:
日志记录:可以使用 Python 的 logging 模块替代 print 语句,以便更好地管理和控制日志输出。
配置管理:将配置参数(如 smtp_server, username, password 等)提取到配置文件中,便于管理和维护。
异常处理:增加更多的异常捕获和处理逻辑,提高程序的健壮性和容错能力。
性能优化:考虑使用异步编程模型(如 asyncio)进一步提升邮件发送的效率。