本示例代码将调用阿里云百炼中的大模型服务,您需要获取百炼API Key。代码中使用 qwen-plus-latest 生成业务数据,使用qwen3-235b-a22b 模型进行打标。
# -*- coding: utf-8 -*-
import os
import asyncio
import random
import json
import sys
from typing import List, Dict
from openai import AsyncOpenAI
import platform
# 创建异步客户端实例
client = AsyncOpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
# 中国省份列表
provinces = [
"北京市", "天津市", "河北省", "山西省", "内蒙古自治区", "辽宁省", "吉林省", "黑龙江省",
"上海市", "江苏省", "浙江省", "安徽省", "福建省", "江西省", "山东省", "河南省",
"湖北省", "湖南省", "广东省", "广西壮族自治区", "海南省", "重庆市", "四川省", "贵州省",
"云南省", "西藏自治区", "陕西省", "甘肃省", "青海省", "宁夏回族自治区", "新疆维吾尔自治区"
]
# 收件人写法模板
recipient_templates = [
"收件人{name}", "收件人:{name}", "收件人是{name}", "收件:{name}",
"收件人为{name}", "{name}", "姓名:{name}", "姓名{name}",
"联系人{name}", "联系人:{name}", "接收人{name}", "接收人:{name}",
"收货人{name}", "收货人:{name}", "寄给{name}", "给{name}",
"收件者{name}", "收件者:{name}", "接收者{name}", "接收者:{name}"
]
# 电话号码写法模板
phone_templates = [
"tel:{phone}", "tel:{phone}", "mobile:{phone}", "mobile:{phone}",
"手机号码{phone}", "手机号码:{phone}", "手机:{phone}", "手机{phone}",
"电话:{phone}", "电话{phone}", "联系电话{phone}", "联系电话:{phone}",
"号码:{phone}", "号码{phone}", "TEL:{phone}", "MOBILE:{phone}",
"contact:{phone}", "phone:{phone}", "{phone}", "call:{phone}",
"联系方式{phone}", "联系方式:{phone}", "电话号码{phone}", "电话号码:{phone}",
"手机号{phone}", "手机号:{phone}", "电话号码是{phone}", "联系电话是{phone}"
]
# 生成虚拟手机号码(以2开头避免与真实号码重合)
def generate_mobile():
prefixes = ['200', '201', '202', '203', '204', '205', '206', '207', '208', '209',
'210', '211', '212', '213', '214', '215', '216', '217', '218', '219',
'220', '221', '222', '223', '224', '225', '226', '227', '228', '229',
'230', '231', '232', '233', '234', '235', '236', '237', '238', '239']
return random.choice(prefixes) + ''.join([str(random.randint(0, 9)) for _ in range(8)])
# 生成固定电话
def generate_landline():
area_codes = ['010', '021', '022', '023', '024', '025', '027', '028', '029', '0311', '0351', '0431', '0451']
area_code = random.choice(area_codes)
number = ''.join([str(random.randint(0, 9)) for _ in range(random.choice([7, 8]))])
return f"{area_code}-{number}"
# 使用大模型生成收件人信息和地址信息
async def generate_recipient_and_address_by_llm(province: str):
"""使用大模型生成指定省份的收件人姓名和地址信息"""
prompt = f"""请为{province}生成一个收件人的信息,包含:
1. 一个真实的中文姓名(可以是常见姓名,也可以是不那么常见的,要多样化)
2. 该省份下的一个城市名称
3. 该城市下的一个行政区名称(如区、县等)
4. 一个具体的街道地址(如路名+门牌号、小区名+楼栋号、商业大厦+楼层等,要真实)
请直接返回JSON格式:
{{"name": "收件人姓名", "city": "城市名", "district": "行政区名", "specific_location": "具体地址"}}
不要包含任何其他内容,只返回JSON。姓名要多样化,不要总是常见的张三李四。"""
try:
response = await client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="qwen-plus-latest",
temperature=1.7, # 提高温度让姓名更多样化
)
result = response.choices[0].message.content.strip()
# 清理可能的markdown代码块标记
if result.startswith('```'):
result = result.split('\n', 1)[1]
if result.endswith('```'):
result = result.rsplit('\n', 1)[0]
# 尝试解析JSON
info = json.loads(result)
return info
except Exception as e:
print(f"生成收件人和地址失败: {e}, 使用备用方案")
# 备用方案
backup_names = ["王建军", "李春燕", "张志华", "陈美玲", "刘德强", "赵敏慧", "孙文博", "周晓丽"]
return {
"name": random.choice(backup_names),
"city": f"{province.replace('省', '').replace('市', '').replace('自治区', '')}市",
"district": "市辖区",
"specific_location": f"人民路{random.randint(1, 999)}号"
}
# 生成一条记录
async def generate_record():
# 随机选择省份
province = random.choice(provinces)
# 使用大模型生成收件人和地址信息
info = await generate_recipient_and_address_by_llm(province)
# 生成收件人信息格式
recipient = random.choice(recipient_templates).format(name=info['name'])
# 生成电话号码(70%概率手机号,30%概率固话)
if random.random() < 0.7:
phone = generate_mobile()
else:
phone = generate_landline()
phone_info = random.choice(phone_templates).format(phone=phone)
# 组装地址
full_address = f"{info['city']}{info['district']}{info['specific_location']}"
# 组装数据
components = [recipient, phone_info, full_address]
# 随机打乱顺序
random.shuffle(components)
# 随机选择分割符
separators = [' ', ',', ',', ';', ';', ':', ':', '、', '|', '\t', '', ' ', ' | ', ' , ', ' ; ', '/']
separator = random.choice(separators)
# 合并数据
if separator == '':
# 没有分割符的情况
combined_data = ''.join(components)
else:
combined_data = separator.join(components)
return combined_data
# 生成批量数据
async def generate_batch_data(count: int) -> List[str]:
"""生成指定数量的数据"""
print(f"开始生成 {count} 条数据...")
data = []
# 使用信号量控制并发数量,QPM=1500,设置为20个并发
semaphore = asyncio.Semaphore(20)
async def generate_single_record(index):
async with semaphore:
record = await generate_record()
print(f"生成第{index+1}条数据: {record}")
return record
# 并发生成数据
tasks = [generate_single_record(i) for i in range(count)]
data = await asyncio.gather(*tasks)
return data
# 保存数据到文件
def save_data(data: List[str], filename: str = "recipient_data.json"):
"""保存数据到JSON文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已保存到 {filename}")
# 数据生成阶段
async def produce_data_phase():
print("=== 第一阶段:开始生成收件人数据 ===")
# 生成2000条数据
batch_size = 2000
data = await generate_batch_data(batch_size)
# 保存数据
save_data(data)
print(f"\n总共生成了 {len(data)} 条数据")
print("\n示例数据:")
for i, record in enumerate(data[:3]): # 显示前3条作为示例
print(f"{i+1}. 原始数据: {record}")
print()
print("=== 第一阶段完成 ===\n")
return True
def get_system_prompt():
"""返回系统提示词"""
return """你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的结构化信息。
## 任务说明
请根据给定的输入文本,准确提取并生成包含以下六个字段的JSON格式输出:
- province: 省份/直辖市/自治区(必须是完整的官方名称,如"河南省"、"上海市"、"新疆维吾尔自治区"等)
- city: 城市名称(包含"市"字,如"郑州市"、"西安市"等)
- district: 区县名称(包含"区"、"县"等,如"金水区"、"雁塔区"等)
- specific_location: 具体地址(街道、门牌号、小区、楼栋等详细信息)
- name: 收件人姓名(完整的中文姓名)
- phone: 联系电话(完整的电话号码,包括区号)
## 抽取规则
1. **地址信息处理**:
- 必须准确识别省、市、区的层级关系
- 省份名称必须使用官方全称(如"河南省"而非"河南")
- 直辖市的province和city字段应该相同(如都填"上海市")
- specific_location应包含详细的街道地址、小区名称、楼栋号等
2. **姓名识别**:
- 准确提取完整的中文姓名,包括复姓
- 包括少数民族姓名
3. **电话号码处理**:
- 提取完整的电话号码,保持原有格式
## 输出格式
请严格按照以下JSON格式输出,不要添加任何解释性文字:
{
"province": "省份名称",
"city": "城市名称",
"district": "区县名称",
"specific_location": "详细地址",
"name": "收件人姓名",
"phone": "联系电话"
}"""
# 使用大模型预测结构化数据
async def predict_structured_data(raw_data: str):
"""使用qwen3-235b-a22b模型预测结构化数据"""
system_prompt = get_system_prompt()
try:
response = await client.chat.completions.create(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": raw_data}
],
model="qwen3-235b-a22b",
temperature=0.1, # 降低温度以提高预测准确性
response_format={"type": "json_object"},
extra_body={"enable_thinking":False}
)
result = response.choices[0].message.content.strip()
# 清理可能的markdown代码块标记
if result.startswith('```'):
lines = result.split('\n')
for i, line in enumerate(lines):
if line.strip().startswith('{'):
result = '\n'.join(lines[i:])
break
if result.endswith('```'):
result = result.rsplit('\n```', 1)[0]
# 尝试解析JSON
structured_data = json.loads(result)
return structured_data
except Exception as e:
print(f"预测结构化数据失败: {e}, 原始数据: {raw_data}")
# 返回空的结构化数据作为备用
return {
"province": "",
"city": "",
"district": "",
"specific_location": "",
"name": "",
"phone": ""
}
# 数据转换阶段
async def convert_data_phase():
"""转换数据格式并使用大模型预测结构化数据"""
print("=== 第二阶段:开始转换数据格式 ===")
try:
print("开始读取recipient_data.json文件...")
# 读取原始数据
with open('recipient_data.json', 'r', encoding='utf-8') as f:
raw_data_list = json.load(f)
print(f"成功读取数据,共有 {len(raw_data_list)} 条记录")
print("开始使用qwen3-235b-a22b模型预测结构化数据...")
# 使用简单与明确的system message 有助于训练与推理速度的提高
system_prompt = "你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的JSON信息,包含的Key有province(省份)、city(城市名称)、district(区县名称)、specific_location(街道、门牌号、小区、楼栋等详细信息)、name(收件人姓名)、phone(联系电话) 现在输入如下:"
output_file = 'recipient_sft_data.json'
# 使用信号量控制并发数量
semaphore = asyncio.Semaphore(10)
async def process_single_item(index, raw_data):
async with semaphore:
# 使用大模型预测结构化数据
structured_data = await predict_structured_data(raw_data)
print(f"处理第{index+1}条数据: {raw_data}")
conversation = {
"instruction": system_prompt + raw_data,
"output": json.dumps(structured_data, ensure_ascii=False)
}
return conversation
print(f"开始转换数据到 {output_file}...")
# 并发处理所有数据
tasks = [process_single_item(i, raw_data) for i, raw_data in enumerate(raw_data_list)]
conversations = await asyncio.gather(*tasks)
with open(output_file, 'w', encoding='utf-8') as outfile:
json.dump(conversations, outfile, ensure_ascii=False, indent=4)
print(f"转换完成!共处理 {len(raw_data_list)} 条记录")
print(f"输出文件:{output_file}")
print("=== 第二阶段完成 ===")
except FileNotFoundError:
print("错误:找不到 recipient_data.json 文件")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"JSON解析错误:{e}")
sys.exit(1)
except Exception as e:
print(f"转换过程中发生错误:{e}")
sys.exit(1)
# 主函数
async def main():
print("开始执行合并的数据处理流程...")
print("这个程序将依次执行两个阶段:")
print("1. 生成原始收件人数据")
print("2. 使用qwen3-235b-a22b模型预测结构化数据并转换为SFT训练格式")
print("-" * 50)
# 第一阶段:生成数据
success = await produce_data_phase()
if success:
# 第二阶段:转换数据
await convert_data_phase()
print("\n" + "=" * 50)
print("全部流程执行完成!")
print("生成的文件:")
print("- recipient_data.json: 原始数据列表")
print("- recipient_sft_data.jsonl: SFT训练格式数据")
print("=" * 50)
else:
print("数据生成阶段失败,终止执行")
if __name__ == '__main__':
# 设置事件循环策略
if platform.system() == 'Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 运行主协程
asyncio.run(main(), debug=False)