利用传统手段将文档内容转换为视频,比如根据文档内容录制一个视频,不仅需要投入大量的时间和精力,而且往往需要具备专业的视频编辑技能。使用大模型技术可以更加有效且智能化地解决上述问题。本实践方案旨在依托大语言模型(Large Language Models, LLMs)和多模态应用技术,向您展示如何将文档自动转换为视频,并提供配套的完整代码包,帮助您快速入门上手本实践教程。
效果演示 通过本实践教程及完整代码 ,您可以借助大模型将文档转为视频。所生成的视频包含完整的图文、语音、字幕等内容。下面是一个示例,输入文档为阿里云大模型工程师 ACA 认证 课程第一章第一课时认识大模型 ,输出的视频效果演示如下:
阿里云大模型工程 ACA 认证课程第一章第一课时:认识大模型。 方案概览 文档切片: 首先,我们运用大模型来总结文档标题,将文档划分为不同段落。并为每个段落生成一个概括性的段落标题。
生成演示文稿: 紧接着,我们整合各部分内容,包括标题、正文以及图片等,利用这些素材生成演示文稿图片。
生成讲解语音与字幕: 接下来,我们采用多模态大模型技术,将文字材料转换成音频文件,并依据音频的播放时长自动生成配套的文字字幕。
生成视频: 最后我们将所有演示文稿图片剪辑为视频,并将音频与字幕文件嵌入视频。
准备工作 获取 API Key ,用于调用阿里云百炼提供的大模型。
百炼为新用户提供了免费额度,额度消耗完后按 token 计费。您可以查看计费说明 获取计费详情。 配置环境变量,以降低 API-KEY 的泄露风险。您可以参考配置 API Key 到环境变量 ,根据您的操作系统选择适合的环境变量配置方法。
本实践教程依赖音视频处理工具 FFmpeg 和演示文稿制作工具 Marp,请您使用如下示例代码安装这两个工具:
brew install ffmpeg
brew install node
npm install -g cnpm --registry=https://registry.npmmirror.com
cnpm install -g @marp-team/marp-cli
Set-ExecutionPolicy RemoteSigned -Scope CurrentUser
iex "& {$ (irm scoop.201704.xyz)} -RunAsAdmin"
scoop install git
scoop bucket add scoop-cn https://mirror.ghproxy.com/https://github.com/duzyn/scoop-cn
scoop install scoop-cn /marp
scoop install scoop-cn /ffmpeg
本实践教程中的图片生成依赖于浏览器引擎渲染,请您确保您的工作环境中安装了浏览器应用,如 Chromium,Google Chrome,Microsoft Edge 等。
本实践教程基于 Python 代码,请您确认您的工作环境中已经安装 Python ,并安装本实践教程所需的 Python 库,代码示例如下:
pip install --upgrade pip
pip install pyppeteer
pip install dashscope
pip install --upgrade dashscope
pip install pydub
pip install natsort
pip install moviepy
pip install ffmpeg-python
pip install --upgrade urllib3 requests
快速体验 如果您希望快速尝试,可以直接下载本教程中提供的完整代码 到您的本地工作环境中,并在本地执行如下命令:
unzip doc2video.zip
cd doc2video
chmod +x run.sh
为了帮助您理解方案流程,并能够根据实际需要进行个性化定制,下面我们将为您介绍如何从 0 开始,逐步构建一个文档生成视频的工程。
步骤一:准备素材 请您将文档中的文字、Markdown 格式的图片链接等内容写到 Markdown 文件中,并以section_1.md 的格式命名,保存到input 文件夹中。我们将下面提供的section_1.md 文件作为示例输入文档。



您可以直接使用完整代码 中我们提供的图片、风格文件等素材,并将其全部保存到style 文件夹中。
您需要在您的工作环境目录下创建一个 Python 文件main.py ,以便于设置参数以及函数调用,示例代码如下:
import argparse
import datetime
import os
import shutil
def main (args ):
start_time = datetime.datetime.now()
input_base_name = os.path.splitext(os.path.basename(args.input_txt_path))[0 ]
if not os.path.exists(args.markdown_path):
for filename in os.listdir(args.input_style_path):
full_path = os.path.join(args.input_style_path, filename)
if os.path.isfile(full_path):
shutil.copy2(full_path, args.markdown_path)
end_time = datetime.datetime.now()
elapsed_time = end_time - start_time
elapsed_hours, remainder = divmod (elapsed_time.total_seconds(), 3600 )
elapsed_minutes, elapsed_seconds = divmod (remainder, 60 )
start_time_str = start_time.strftime("%Y年%m月%d日 %H时%M分%S秒" )
end_time_str = end_time.strftime("%Y年%m月%d日 %H时%M分%S秒" )
elapsed_time_str = f"{int (elapsed_hours)} 时{int (elapsed_minutes)} 分{int (elapsed_seconds)} 秒"
print (f"开始时间: {start_time_str} " )
print (f"结束时间: {end_time_str} " )
print (f"总时间: {elapsed_time_str} " )
if __name__ == "__main__" :
parser = argparse.ArgumentParser(description="文档生成视频" )
parser.add_argument('--input_txt_path' , type =str , default='./input/section_1.md' , help ='输入文本的路径' )
parser.add_argument('--input_style_path' , type =str , default='./style' , help ='输入样式文件夹的路径' )
parser.add_argument('--markdown_style_path' , type =str , default='./style/style.md' , help ='Markdown 样式的路径' )
parser.add_argument('--logo_path' , type =str , default='./style/logo.png' , help ='Logo 图片的路径' )
parser.add_argument('--theme_path' , type =str , default='./style/theme.png' , help ='主题图片的路径' )
parser.add_argument('--title_path' , type =str , default='./style/title.png' , help ='标题图片的路径' )
parser.add_argument('--json_path' , type =str , default='./material/json' , help ='JSON 文件的路径' )
parser.add_argument('--image_path' , type =str , default='./material/image' , help ='图像文件夹的路径' )
parser.add_argument('--audio_path' , type =str , default='./material/audio' , help ='音频文件夹的路径' )
parser.add_argument('--markdown_path' , type =str , default='./material/markdown' , help ='Markdown 文件夹的路径' )
parser.add_argument('--srt_and_video_path' , type =str , default='./material/video' , help ='字幕和视频文件夹的路径' )
parser.add_argument('--fps' , type =int , default=30 , help ='帧率' )
parser.add_argument('--title' , type =str , default="认识大模型" , help ='文档主题' )
args = parser.parse_args()
您需要将main.py 保存到与input 、style 文件夹相同的路径目录下。在后续流程中,您需要在main.py 中导入各模块函数并且调用它们,我们已经在上述main.py 文件中标注了引用或调用函数的位置。在调用各模块函数后,如果您想测试输出,您可以直接在终端运行以下代码:
步骤二:文档切片 在这一部分中,我们运用大模型为输入文档生成文档标题并划分段落,然后借助大模型对每个段落的内容进行归纳总结,同时为每个段落自动生成相应的段落标题。
划分文本段落 我们借助 API 调用阿里云百炼提供的通义千问系列大模型通义千问-Plus ,为输入文档生成一个文档标题并划分段落。
通义千问-Plus 是通义千问超大规模语言模型的增强版,支持中文英文等不同语言输入。能力均衡,推理效果和速度介于通义千问-Max 和通义千问-Turbo 之间,适合中等复杂任务。您也可以根据实际应用需求来选择合适的大模型,详见模型列表 。 通义千问-Plus 的输入和输出成本分别为 0.004 元/千 Token 和 0.012 元/千 Token,新用户在开通百炼服务后的 30 天内拥有 100 万 Token 的免费额度。 新建一个名为theme_generate.py 的 Python 文件,代码示例如下:
from http import HTTPStatus
import dashscope
import re
def theme_generate_with_qwen_plus (input_filepath, title ):
- input_filepath: 输入文件的路径。该文件的内容将用于生成摘要标题。
- title: 生成摘要标题需围绕的主题。确保生成的标题与该主题紧密相关。
- response_content: 生成的摘要标题。
- 该函数以流式传输的方式请求生成标题,仅当响应状态码为HTTPStatus.OK时,累加响应内容。
- 如果发生错误,函数会打印请求的相关错误信息。
with open (input_filepath, 'r' , encoding='utf-8' ) as file:
content = file.read()
prompt = f"""
需要紧紧地围绕主题“{title} ”。
messages = [{
'role' : 'user' ,
'content' : f"""
"{prompt} "
"{content} "
response_content = ''
responses = dashscope.Generation.call("qwen-plus" ,
result_format='message' ,
stream=True ,
incremental_output=True )
for response in responses:
if response.status_code == HTTPStatus.OK:
response_content += response.output.choices[0 ]['message' ]['content' ]
else :
print ('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
response_content = re.sub(r'^"|"$' , '' , response_content)
return response_content
在theme_generate.py 中,我们定义了一个函数theme_generate_with_qwen_plus ,通过 API 调用通义千问-Plus 为文档生成一个文档标题。在main.py 中导入并调用该函数,代码示例如下:
导入并调用 theme_generate_with_qwen_plus
from theme_generate import theme_generate_with_qwen_plus
theme = theme_generate_with_qwen_plus(args.input_txt_path, args.title)
print (theme)
我们可以调用该函数来获取示例文档section_1.md 的文档标题:
示例文档的标题:大模型:影响与应用。 接下来新建一个名为doc_split.py 的 Python 文件,代码示例如下:
from http import HTTPStatus
import dashscope
import json
import os
def doc_split_with_qwen_plus (input_filepath, output_filepath ):
if not os.path.exists(output_filepath):
with open (input_filepath, 'r' , encoding='utf-8' ) as file:
content = file.read()
prompt = """
1. **分段逻辑**:仔细分析文档内容,根据其内在语义逻辑合理划分段落。
2. **标题创作**:为每一独立段落设计一个精炼标题,确保该标题简洁明了(不超过10个字),并能有效准确地概括该段落核心信息。
3. **输出规格**:完成处理后,生成的文档结构需符合JSON格式标准,每段落及对应的标题组成一个条目,具体格式示例如下:
{"title": " ", "content": " "},
{"title": " ", "content": " "},
4. **原文忠实性**:在输出的JSON数据中,各段落的“content”字段必须精确匹配原始文档的文字内容,不得有增删改动。必须完整地处理原始文档的全部内容,不能有遗漏。请严格保证文字和链接在原文档中的相对位置保持不变。
5. **格式化链接**:对于文档中的markdown格式的图片链接,将他们单独保存到JSON条目中。其"title"为"链接{index}","content"为链接地址,其中index为索引顺序。
6. **内容限制**:输出内容中不得包含任何多余的空格、换行符、制表符等空白字符,也不得包含任何HTML、XML、Markdown等格式的符号。始终保持中文。
messages = [{
'role' : 'user' ,
'content' : f"""
"{prompt} "
"{content} "
response_content = ''
responses = dashscope.Generation.call("qwen-plus" ,
result_format='message' ,
stream=True ,
incremental_output=True )
for response in responses:
if response.status_code == HTTPStatus.OK:
response_content += response.output.choices[0 ]['message' ]['content' ]
else :
print ('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
if response_content.startswith("```" ) and response_content.endswith("```" ):
response_content = response_content[8 :-3 ].strip()
input_base_name = os.path.splitext(os.path.basename(input_filepath))[0 ]
output_file_path = os.path.join(output_filepath, f'{input_base_name} .json' )
with open (output_file_path, 'w' , encoding='utf-8' ) as json_file:
json.dump(json.loads(response_content), json_file, ensure_ascii=False , indent=4 )
return response_content
在doc_split.py 中,我们定义了一个函数doc_split_with_qwen_plus ,通过 API 调用通义千问-Plus 将输入文档划分为不同段落并为每个段落总结一个段落标题。在main.py 中导入并调用该函数,代码示例如下:
from doc_split import doc_split_with_qwen_plus
doc_split_with_qwen_plus(args.input_txt_path, os.path.join(args.json_path))
调用该函数来为示例文档section_1.md 划分段落并生成段落标题,输出的 JSON 文件section_1.json 会被保存到./material/json 中:
输出 section_1.json。“title”字段为段落标题,“content”字段为段落内容,图片链接单独保存在字段中。 提炼内容 接着我们通过 API 调用百炼平台大模型通义千问-Plus ,总结提炼各个段落的内容。
新建一个名为qwen_plus_marp.py 的 Python 文件,代码示例如下:
import asyncio
import os
from http import HTTPStatus
import dashscope
def call_with_stream (content ):
prompt2 = """
“- 内容”之后不要再分段落描述。
示例输出:- 使用搜索引擎遇到的问题:难以找到有效信息
messages = [{
'role' : 'user' ,
'content' : f"""
"{prompt2} "
"{content} "
response_content = ''
responses = dashscope.Generation.call("qwen-plus" ,
result_format='message' ,
stream=True ,
incremental_output=True )
for response in responses:
if response.status_code == HTTPStatus.OK:
response_content += response.output.choices[0 ]['message' ]['content' ]
else :
print ('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
return response_content
在qwen_plus_marp.py 中,我们定义了一个函数call_with_stream ,通过 API 调用通义千问-Plus 来处理各段落中的内容,具体如下:
我们将在整合图文素材时导入并调用call_with_stream 函数。
步骤三:生成演示文稿 在这一部分中,我们将图文素材整合到 Markdown 文件中,并生成演示文稿图片。
在介绍详细流程和代码之前,我们首先简单介绍一下这部分用到的工具:Marp。Marp 是一款基于 Markdown 语法的开源演示文稿制作工具。您只需要通过编辑 Markdown 文本,即可生成精美的演示文稿。如果您是 VS Code 使用者,您还可以下载安装 Marp for VS Code 插件来实时预览。您也可以参考Marp 官方文档 ,打造出独具个人风格特色的演示文稿。
在 VS Code 的扩展中搜索并安装 Marp for VS Code。 由于 Marp 在将 Markdown 转换为演示文稿时采用了特定的格式规范和扩展语法,我们准备了一个 Python 脚本——markdown_gather.py ,用于汇总一些优化和调整 Markdown 格式内容的函数,代码示例如下:
import os
import re
from pathlib import Path
import os
def merge_style_with_md_files (md_file_path, style_file_path ):
if not os.path.isfile(style_file_path):
raise FileNotFoundError(f"样式文件 {style_file_path} 不存在。" )
with open (style_file_path, 'r' , encoding='utf-8' ) as f:
style_content = f.read()
for filename in os.listdir(md_file_path):
if filename.startswith('section' ) and filename.endswith('.md' ):
file_path = os.path.join(md_file_path, filename)
if os.path.exists(file_path):
with open (file_path, 'r+' , encoding='utf-8' ) as f:
original_content = f.read()
f.seek(0 )
f.write(style_content + '\n\n' + original_content)
def remove_trailing_dashes (directory ):
从 Markdown 文件中移除位于文件末尾且后面没有其他内容(除了可能的换行符)的连续破折号(---)。
for filename in os.listdir(directory):
if filename.startswith('section' ) and filename.endswith('.md' ):
filepath = os.path.join(directory, filename)
with open (filepath, 'r' , encoding='utf-8' ) as file:
content = file.read()
if content.rstrip().endswith('---' ) and content.rstrip('---' ).endswith('\n' ):
content = content.rstrip('---\n' )
content = content.replace("------" , "" )
content = re.sub(r'\n{3,}' , '\n\n' , content)
with open (filepath, 'w' , encoding='utf-8' ) as file:
def remove_empty_lines (filename ):
with open (filename, 'r' , encoding='utf-8' ) as file:
content = file.read()
content = content.replace("------" , "" )
with open (filename, 'w' , encoding='utf-8' ) as file:
def append_string_to_file (file_path ):
with open (file_path, 'r' , encoding='utf-8' ) as file:
content = file.read()
new_content = content + '---'
with open (file_path, 'w' , encoding='utf-8' ) as file:
def insert_logo (file_path, logo_path ):
with open (file_path, 'r' , encoding='utf-8' ) as file:
content = file.read()
logo_filename = Path(logo_path).name
insert_str = f"""<!--\nbackgroundImage: url("./{logo_filename} ");\nbackgroundSize: 10% ;\nbackgroundPosition: 98% 3% ;\n-->
new_content = re.sub(r'(?<!-)---(?!-)' , f'\n{insert_str} \n---' , content, flags=re.DOTALL)
with open (file_path, 'w' , encoding='utf-8' ) as file:
def insert_bg_if_no_link (filename, theme_url ):
result = []
with open (filename, 'r' , encoding='utf-8' ) as file:
content = file.read()
theme_filename = Path(theme_url).name
sections = re.split(r'---+' , content)
for i, section in enumerate (sections):
header_match = re.search(r'# \{(.+?)\}' , section)
if header_match:
header_content = section[header_match.end():].strip()
if not re.search(r'!\[[^\]]*\]\([^\)]*\)' , header_content):
section = f'{section[:header_match.end()]} \n{section[header_match.end():]} \n---'
if i < len (sections) - 1 :
result.append('---' )
with open (filename, 'w' , encoding='utf-8' ) as file:
file.write('' .join(result))
def title_to_md (file_path, content, title_url ):
title_filename = Path(title_url).name
with open (file_path, 'r' , encoding='utf-8' ) as file:
original_content = file.read()
content = f"\n# {content} \n---"
new_content = content + '\n' + original_content
with open (file_path, 'w' , encoding='utf-8' ) as file:
您需要将其和main.py 、各函数文件放在同一路径目录下,并在后续调用该文件中的函数。
整合图文素材 我们将调用前文步骤二中“提炼内容”部分介绍的函数call_with_stream 得到文档各段落关键要点、标题、图片链接等内容,并将它们整合为 Markdown 格式,输出 Markdown 文件。新建一个名为json2md.py 的函数文件,代码示例如下:
import json
import os
import re
from qwen_plus_marp import call_with_stream
from pathlib import Path
def is_link (text ):
return text.startswith("http://" ) or text.startswith("https://" )
def parse_json_list_to_markdown (json_list, theme_url ):
将 JSON 对象列表转换为 Markdown 格式,并通过 call_with_stream 处理 'content'。
- json_list (list): 字典列表,每个字典都包含 'title' 和 'content' 键。
- str: Markdown 格式的字符串。
theme = "\n\n"
theme_filename = Path(theme_url).name
markdown_content = ""
for item in json_list:
title = item.get('title' , '未命名' )
processed_content = call_with_stream(item.get('content' , '' ))
if is_link(processed_content):
markdown_content += f"---\n\n\n\n---"
else :
markdown_content += f"\n\n# {title} \n\n{processed_content} \n\n---"
return markdown_content
def parse_json_list_to_markdown_new (json_list, theme_url ):
将 JSON 对象列表转换为 Markdown 格式,并通过 call_with_stream 处理 'content'。
- json_list (list): 字典列表,每个字典都包含 'title' 和 'content' 键。
- str: Markdown 格式的字符串。
theme_filename = Path(theme_url).name
theme = f"\n\n"
markdown_content = ""
for i, item in enumerate (json_list):
title = item.get('title' , '未命名' )
processed_content = call_with_stream(item.get('content' , '' ))
if processed_content.startswith("```" ) and processed_content.endswith("```" ):
processed_content = processed_content[11 :-3 ].strip()
if not is_link(json_list[i].get('content' )):
if i == len (json_list) - 1 :
markdown_content += f"\n\n## {title} \n\n{processed_content} \n\n{theme} \n\n---"
else :
if not is_link(json_list[i + 1 ].get('content' )):
markdown_content += f"\n\n## {title} \n\n{processed_content} \n\n{theme} \n\n---"
else :
markdown_content += f"\n\n## {title} \n\n{processed_content} \n\n---"
else :
markdown_content += f"---\n\n} )\n\n---"
return markdown_content
def convert_json_file_to_md (json_file_path, output_dir, theme_url ):
读取 JSON 文件,通过 call_with_stream 转换其内容,然后保存为 Markdown 文件。
- json_file_path (str): JSON 文件的路径。
- output_dir (str): Markdown 文件将被保存的目录。
if not os.path.exists(output_dir):
with open (json_file_path, 'r' , encoding='utf-8' ) as file:
json_data = json.load(file)
markdown_content = parse_json_list_to_markdown_new(json_data, theme_url)
base_name = os.path.splitext(os.path.basename(json_file_path))[0 ]
md_file_name = f"{base_name} .md"
output_path = os.path.join(output_dir, md_file_name)
with open (output_path, 'w' , encoding='utf-8' ) as file:
def save_markdown_to_file (content, filename ):
""" 保存Markdown内容到文件 """
with open (filename, 'w' , encoding='utf-8' ) as file:
def process_markdown (input_file ):
""" 处理Markdown文本,按要求分割并保存 """
with open (input_file, 'r' , encoding='utf-8' ) as file:
input_text = file.read()
parts = re.split(r'(?<=---\n)' , input_text)
parts = [part.strip() for part in parts if part.strip()]
filenames = []
base_path = os.path.dirname(input_file)
for i, part in enumerate (parts):
filename = f'{os.path.splitext(os.path.basename(input_file))[0 ]} _{i} .md'
full_filename = os.path.join(base_path, filename)
save_markdown_to_file(f'{part} ' , full_filename)
在json2md.py 中,我们定义了多个函数,主要作用如下:
您需要在main.py 中导入并调用json2md.py 中的函数,代码如下:
from json2md import convert_json_file_to_md
for filename in os.listdir(args.json_path):
if filename.endswith('.json' ):
json_file_path = os.path.join(args.json_path, filename)
convert_json_file_to_md(json_file_path, args.markdown_path, args.theme_path)
为了美化演示文稿,我们进一步添加阿里云 Logo、标题页,并调整 Markdown 格式以适配 Marp 语法。我们通过导入并调用前文提供的markdown_gather.py 中的函数实现,代码如下:
导入并调用 markdown_gather.py 中的函数
from markdown_gather import insert_logo, remove_empty_lines, title_to_md
title_to_md(os.path.join(args.markdown_path,f'{input_base_name} .md' ), theme, args.title_path)
remove_empty_lines(os.path.join(args.markdown_path,f'{input_base_name} .md' ))
insert_logo(os.path.join(args.markdown_path,f'{input_base_name} .md' ), os.path.join(args.logo_path))
将步骤二中输出的section_1.json 作为输入,输出的 Markdown 文件section_1.md 会被保存在./material/markdown 中,效果演示如下:
素材整合后的示例输出 section_1.md。 生成演示文稿 接下来我们基于 Marp 生成演示文稿。在使用 Marp 生成演示文稿前,我们需要定义 Marp 的整体风格及全局样式。我们可以在 Markdown 文件的顶部设置,例如:
marp: true
theme: gaia
您可以在本实践教程的完整代码 中style 文件夹里获取我们为您准备的 Markdown 风格文件style.md ,并将其置于 Markdown 文件的开头。您可以通过在main.py 中导入并调用markdown_gather.py 中的函数来实现,具体代码如下:
from markdown_gather import merge_style_with_md_files, remove_trailing_dashes
process_markdown(os.path.join(args.markdown_path,f'{input_base_name} .md' ))
merge_style_with_md_files(args.markdown_path, args.markdown_style_path)
加载风格文件 style.md 之后的 Markdown 示例输出。 得到上述的输出后,如果您是 VS Code 用户且已经安装了 Marp for VS Code 插件,那么您可以实时预览 Marp 演示文稿的输出效果。点击界面右上角的预览图标:
在 VS Code 界面左上角点击预览图标。 实时预览效果如下:
Marp 实时预览效果演示。 通过预览确认了 Marp 演示文稿的输出内容无误后,我们将其导出为图片。新建一个名为marp2image.py 的 Python 文件,代码示例如下:
import os
import re
from pathlib import Path
import os
在marp2image.py 中,我们定义了一个函数convert_md_files_to_png ,将 Markdown 文件导出为 png 格式的 Marp 演示文稿。在main.py 中导入并调用函数convert_md_files_to_png ,代码示例如下:
导入并调用函数 convert_md_files_to_png
from marp2image import convert_md_files_to_png
convert_md_files_to_png(os.path.join(args.markdown_path,f'{input_base_name} .md' ), args.image_path)
调用函数得到的所有的输出图片均会被保存到./material/image 中,示例如下:
示例图片。 步骤四:生成讲解语音与字幕 在这一部分中,我们利用多模态语音大模型将文字内容合成为音频,并且精确计算音频的时长,随后自动生成对应的字幕时间戳。
文字合成音频 我们将文档内容划分为若干个句子,然后通过 API 调用阿里云百炼提供的语音合成大模型CosyVoice ,将所有句子全部合成为 mp3 格式的音频文件。新建一个名为audio_generate_each_sentence.py 的 Python 文件,代码示例如下:
import json
import os
import re
import time
import dashscope
from dashscope.audio.tts_v2 import SpeechSynthesizer
import traceback
def read_json_file (file_path ):
with open (file_path, 'r' , encoding='utf-8' ) as file:
data = json.load(file)
return data
def split_into_sentences (text ):
punctuation = [',' , '。' , ';' , '?' , '!' ]
brackets = {'(' : ')' , '[' : ']' , '{' : '}' , '(' : ')' , '【' : '】' , '《' : '》' }
sentences = []
temp_sentence = ''
bracket_stack = []
for char in text:
if char in brackets:
elif char in brackets.values() and bracket_stack and brackets[bracket_stack[-1 ]] == char:
if char in punctuation and not bracket_stack:
temp_sentence = ''
else :
temp_sentence += char
if temp_sentence:
return sentences
def save_sentences_to_markdown (sentences, base_dir, index1 ):
for index2, sentence in enumerate (sentences, start=1 ):
dir_name = f'audio_for_paragraph_{index1} '
dir_path = os.path.join(base_dir, dir_name)
os.makedirs(dir_path, exist_ok=True )
file_name = f'paragraph_{index1} _sentence_{index2} .md'
file_path = os.path.join(dir_path, file_name)
with open (file_path, 'w' , encoding='utf-8' ) as file:
file.write(sentence + '\n' )
def process_json_file (json_file_path, base_dir ):
if not os.path.exists(base_dir):
file_prefix = os.path.splitext(os.path.basename(json_file_path))[0 ]
base_dir = os.path.join(base_dir, file_prefix)
json_data = read_json_file(json_file_path)
for index1, item in enumerate (json_data):
if 'content' in item:
content = item['content' ]
if not is_url(content):
sentences = split_into_sentences(content)
save_sentences_to_markdown(sentences, base_dir, index1+1 )
def is_url (s ):
url_pattern = re.compile (r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' )
return bool (url_pattern.match (s))
def synthesize_md_to_speech (base_directory ):
识别指定目录下的所有.md文件,读取其内容并使用DashScope API将其转换为语音,
base_directory (str): 包含.md文件的顶层目录路径。
if 'DASHSCOPE_API_KEY' not in os.environ:
raise ValueError("DashScope API key must be set in the environment variables." )
for root, dirs, files in os.walk(base_directory):
for file in files:
if file.endswith('.md' ):
md_file_path = os.path.join(root, file)
with open (md_file_path, 'r' , encoding='utf-8' ) as f:
text = f.read()
speech_synthesizer = SpeechSynthesizer(model='cosyvoice-v1' , voice='longxiaochun' )
audio_data = speech_synthesizer.call(text)
mp3_file_path = os.path.splitext(md_file_path)[0 ] + '.mp3'
with open (mp3_file_path, 'wb' ) as f:
print (f'Synthesized text from file "{md_file_path} " to file: {mp3_file_path} ' )
我们在audio_generate_each_sentence.py 定义了两个函数,其作用分别为:
在main.py 中导入并引用这两个函数,代码如下:
导入并引用 process_json_file 和 synthesize_md_to_speech
from audio_generate_each_sentence import process_json_file, synthesize_md_to_speech
process_json_file(os.path.join(args.json_path,f'{input_base_name} .json' ), args.audio_path)
synthesize_md_to_speech(os.path.join(args.audio_path, input_base_name))
调用函数后,所有的音频文件会被保存到./material/audio 中的相应文件夹下,示例展示如下:
文字内容为:你有过使用搜索引擎搜索问题却怎么也找不到有效信息的时候吗? 生成字幕 接下来,我们将通过读取音频的时长以及其对应的文字内容,来生成 SRT 格式的字幕文件。新建一个 Pyhton 文件,命名为srt_generate_for_each_sentence.py ,代码示例如下:
import os
import re
from moviepy.editor import AudioFileClip
from typing import List
def format_time (seconds ):
hours, remainder = divmod (seconds, 3600 )
minutes, seconds = divmod (remainder, 60 )
milliseconds = int ((seconds - int (seconds)) * 1000 )
seconds = int (seconds)
return f"{int (hours):02d} :{int (minutes):02d} :{int (seconds):02d} ,{milliseconds:03d} "
def get_audio_duration (file_path ):
audio = AudioFileClip(file_path)
duration = audio.duration
return duration
def create_srt_line (index, start_time, end_time, text ):
return f"{index} \n{start_time} --> {end_time} \n{text} \n\n"
def generate_srt_from_audio (base_dir: str , output_dir: str , output_srt_file: str ) -> None :
:param base_dir: 包含音频文件夹的根目录。
:param output_dir: 输出SRT文件的目录。
:param output_srt_file: 输出SRT文件的完整路径。
if not os.path.exists(output_dir):
if not output_srt_file.endswith('.srt' ):
output_srt_file += '.srt'
current_time = 2.000
with open (output_srt_file, 'w' , encoding='utf-8' ) as srt_file:
srt_index = 1
sub_dirs = [d for d in os.listdir(base_dir) if d.startswith('audio_for_paragraph_' )]
sub_dirs.sort(key=lambda x: int (re.search(r'\d+' , x).group()))
for sub_dir in sub_dirs:
sub_dir_path = os.path.join(base_dir, sub_dir)
files = [f for f in os.listdir(sub_dir_path) if f.endswith('.md' ) or f.endswith('.mp3' )]
md_files = [f for f in files if f.endswith('.md' )]
md_files.sort(key=lambda x: (int (x.split('_' )[1 ]), int (x.split('_' )[3 ].split('.' )[0 ])))
for md_file in md_files:
md_file_path = os.path.join(sub_dir_path, md_file)
mp3_file_path = os.path.splitext(md_file_path)[0 ] + '.mp3'
if os.path.exists(mp3_file_path):
with open (md_file_path, 'r' , encoding='utf-8' ) as f:
text = f.read().strip()
duration = get_audio_duration(mp3_file_path)
start_time_str = format_time(current_time)
end_time_str = format_time(current_time + duration)
srt_line = create_srt_line(srt_index, start_time_str, end_time_str, text)
current_time += duration + 0.3
srt_index += 1
else :
print (f"No corresponding MP3 file found for {md_file} " )
print ("SRT file generated successfully." )
在srt_generate_for_each_sentence.py 中我们定义了一个函数generate_srt_from_audio ,该函数通过读取输入音频的时长以及其对应的文字内容,来生成 SRT 格式的字幕文件。
在main.py 中导入并调用函数generate_srt_from_audio ,代码示例如下:
导入并调用函数 generate_srt_from_audio
from srt_generate_for_each_sentence import generate_srt_from_audio
generate_srt_from_audio(os.path.join(args.audio_path, input_base_name), args.srt_and_video_path, os.path.join(args.srt_and_video_path, input_base_name))
调用函数会自动生成 srt 文件并保存在./material/video 中,示例输出如下:
步骤五:生成视频 在这一部分中,我们将生成的演示文稿剪辑为视频,并将音频文件和字幕文件嵌入到视频中。
剪辑视频 首先我们计算每一张演示文稿在视频中的持续时间。新建一个 Python 文件,命名为calculate_durations_for_each_image.py ,代码示例如下:
import os
from pydub import AudioSegment
def calculate_audio_durations (directory ):
计算指定目录下所有以 audio_for_paragraph_{index} 命名的文件夹中 mp3 文件的总持续时间(以秒为单位)。
directory (str): 需要扫描的根目录路径。
list: 每个 audio_for_paragraph_{index} 文件夹中 mp3 文件总持续时间(秒)的列表。
durations = []
for entry in os.scandir(directory):
if entry.is_dir() and entry.name.startswith("audio_for_paragraph_" ):
index = int (entry.name.split("_" )[-1 ])
total_duration_ms = 0
for file_entry in os.scandir(entry.path):
if file_entry.name.endswith(".mp3" ):
audio = AudioSegment.from_mp3(file_entry.path)
delay = 300
total_duration_ms += len (audio) + delay
total_duration_seconds = total_duration_ms / 1000.0
durations.append((index, total_duration_seconds))
durations.sort(key=lambda x: x[0 ])
durations = [duration for _, duration in durations]
durations.insert(0 , 2 )
return durations
在calculate_durations_for_each_image.py 中,我们定义了函数calculate_audio_durations ,其功能为计算每一张演示文稿在视频中的持续时间。
在main.py 中导入并调用函数calculate_audio_durations ,代码示例如下:
导入并调用函数 calculate_audio_durations
from calculate_durations_for_each_image import calculate_audio_durations
durations = calculate_audio_durations(os.path.join(args.audio_path, input_base_name))
接着我们将所有演示文稿剪辑为视频。新建一个函数文件,命名为movie_editor.py ,代码示例如下:
from moviepy.editor import *
import os
import re
from PIL import Image
import natsort
import math
import numpy as np
def images_to_video_with_durations (input_image_path, output_video_path, durations, fps, base_name ):
pattern = r'^' + re.escape(base_name) + r'_(\d+)\.png$'
image_files = [
f"{input_image_path} /{file} "
for file in os.listdir(input_image_path)
if re.match (pattern, file)
image_files = natsort.natsorted(image_files, key=lambda x: int (re.match (pattern, os.path.basename(x)).group(1 )))
target_width, target_height = 1280 , 720
background_size = (target_width, target_height)
clips = []
for i, file in enumerate (image_files):
img = Image.open (file)
width, height = img.size
ratio = width / height
if width > target_width or height > target_height:
if ratio > target_width / target_height:
new_width = target_width
new_height = math.floor(new_width / ratio)
else :
new_height = target_height
new_width = math.floor(new_height * ratio)
else :
new_width, new_height = width, height
img = img.resize((new_width, new_height), resample=Image.Resampling.LANCZOS)
img_clip = ImageClip(np.array(img)).set_duration(durations[i])
img_clip = img_clip.set_position('center' )
bg_clip = ColorClip(size=background_size, color=(255 ,255 ,255 ), duration=durations[i])
composite_clip = CompositeVideoClip([bg_clip, img_clip])
final_clip = concatenate_videoclips(clips, method="compose" )
output_filename = f"{base_name} .mp4"
final_clip.write_videofile(os.path.join(output_video_path, output_filename), fps=fps)
在movie_editor.py 中,我们定义了函数images_to_video_with_durations ,其功能是将所有输入演示文稿按顺序剪辑为视频。在main.py 中导入并调用函数images_to_video_with_durations ,代码示例如下:
导入并调用函数 images_to_video_with_durations
from movie_editor import images_to_video_with_durations
images_to_video_with_durations(os.path.join(args.image_path,f'{input_base_name} ' ), args.srt_and_video_path, durations, args.fps, input_base_name)
调用代码后,生成的视频section_1.mp4 会被保存在./material/video 中,效果演示如下:
以上视频不包含音频与字幕。 嵌入音频与字幕 接下来我们将音频文件嵌入到视频中。新建一个函数文件,命名为audio2video.py ,代码示例如下:
import os
from moviepy.editor import *
from glob import glob
import re
def merge_audio_and_add_to_video (video_path, audio_base_dir, output_path ):
:param video_path: 视频文件的路径。
:param audio_base_dir: 包含音频文件夹的基目录。
:param output_path: 输出视频的路径。
video_clip = VideoFileClip(video_path)
audio_clips = []
silent_audio_start = AudioClip(lambda t: [0 ,0 ], duration=2 )
audio_dirs = glob(os.path.join(audio_base_dir, "audio_for_paragraph_*" ))
audio_dirs.sort(key=lambda x: int (re.search(r'\d+' , os.path.basename(x)).group()))
for audio_dir in audio_dirs:
index = int (os.path.basename(audio_dir).split("_" )[-1 ])
mp3_files = glob(os.path.join(audio_dir, f"paragraph_{index} _sentence_*.mp3" ))
mp3_files.sort(key=lambda x: int (re.search(r'_sentence_(\d+)' , os.path.basename(x)).group(1 )))
for mp3_file in mp3_files:
audio_clip = AudioFileClip(mp3_file)
if audio_clips:
silent_audio = AudioClip(lambda t: [0 ,0 ], duration=0.3 )
final_audio = concatenate_audioclips(audio_clips)
video_with_audio = video_clip.set_audio(final_audio)
video_with_audio.write_videofile(output_path, codec='libx264' , audio_codec='aac' )
我们在函数文件audio2video.py 中定义了函数merge_audio_and_add_to_video ,其功能为将音频文件嵌入到视频中。
在main.py 中导入并调用函数merge_audio_and_add_to_video ,代码如下:
导入并调用函数 merge_audio_and_add_to_video
from audio2video import merge_audio_and_add_to_video
merge_audio_and_add_to_video(os.path.join(args.srt_and_video_path,f'{input_base_name} .mp4' ), os.path.join(args.audio_path,f'{input_base_name} ' ), os.path.join(args.srt_and_video_path,f'{input_base_name} _with_audio.mp4' ))
调用代码后,生成的视频会保存在./material/video 中。
最后我们将字幕文件嵌入到视频中。新建一个函数文件,命名为srt2video.py ,代码示例如下:
import subprocess
import os
def merge_video_and_subtitle (video_and_srt_path, base_name ):
video_ext = ".mp4"
srt_ext = ".srt"
video_path = os.path.join(video_and_srt_path, f"{base_name} _with_audio" + video_ext).replace("\\" , "/" )
srt_path = os.path.join(video_and_srt_path, base_name + srt_ext).replace("\\" , "/" )
output_path = os.path.join(video_and_srt_path, f"{base_name} _with_audio_with_subs" + video_ext).replace("\\" , "/" )
command = [
'ffmpeg' ,
'-i' , video_path,
'-vf' , f'subtitles={srt_path} ' ,
'-c:a' , 'copy' ,
try :
subprocess.run(command, check=True )
except subprocess.CalledProcessError as e:
print (f"An error occurred while merging video and subtitles: {e} " )
我们在函数文件srt2video.py 中定义了函数merge_video_and_subtitle ,其功能为将 srt 字幕文件嵌入到视频中。在main.py 中导入并调用函数merge_video_and_subtitle ,代码示例如下:
导入并调用函数 merge_video_and_subtitle
from srt2video import merge_video_and_subtitle
merge_video_and_subtitle(args.srt_and_video_path, input_base_name)
调用代码后,生成的视频section_1_with_audio_with_subs.mp4 会被保存在./material/video 中,视频效果展示如下:
以上视频为最终效果输出,包含音频与字幕。 步骤六(可选):生成长文档视频 通过以上步骤,相信您已经成功地构建了完整的文档生成视频项目工程,并且能够成功地将我们提供的示例文档section_1.md 转化为视频。在实际应用中,您可能会有将更长篇幅的文档转化为视频的需求,例如阿里云大模型工程师 ACA 认证 课程第一章第一课时认识大模型 。我们建议您将长篇幅文档划分为若干短文档,并生成所有短文档对应的视频,最终将所有视频整合为一个完整的视频。
划分文档 您需要将长篇幅文档按顺序划分为若干短文档,并按特定的命名格式保存到input 文件夹中。其命名要求为section_index.md ,index 为短文档索引。示例如下:
生成长文档视频 新建一个 Python 文件,命名为merge_all_videos.py ,代码示例如下:
import os
import re
from moviepy.editor import VideoFileClip, concatenate_videoclips
def merge_videos (input_directory ):
video_pattern = r"section_(\d+)_with_audio_with_subs\.mp4"
files = sorted (
(fn for fn in os.listdir(input_directory) if re.match (video_pattern, fn)),
key=lambda x: int (re.match (video_pattern, x).group(1 ))
clips = [VideoFileClip(os.path.join(input_directory, file)) for file in files]
final_clip = concatenate_videoclips(clips)
output_path = os.path.join(input_directory, 'output_merge_all_video.mp4' )
final_clip.write_videofile(output_path, audio_codec='aac' )
merge_videos("./material/video" )
在merge_all_videos.py 中,我们定义了函数merge_videos 来将合并所有视频。
为了更便捷地实现所有短文档视频生成及合并所有视频的全过程,我们新建一个名为run.sh 的 Shell 脚本,代码示例如下:
# !/bin/bash
# 日志文件路径
# 清空日志文件
> "$log_file "
# 记录脚本开始时间
start_time=$(date +%s)
# 定义Python脚本路径
# 定义视频输出路径
# 获取所有符合条件的文件名,并按数字排序
files=$(find ./input -maxdepth 1 -type f -name "section_*.md" | sort -V)
# 遍历文件列表
for file in $files; do
index_part=$(basename "$file" .md | cut -d '_' -f 2)
echo "正在处理第${index_part}部分" >> "$log_file"
python "$python_main_script_path" --input_txt_path "$file" >> "$log_file" 2>&1
if [ $? -ne 0 ]; then
echo "Error processing $file" >> "$log_file"
exit 1
# 合并视频
python "$python_merge_script_path" --input_directory video_path >> "$log_file" 2>&1
# 记录脚本结束时间
end_time=$(date +%s)
# 计算脚本运行时间
runtime=$((end_time - start_time))
# 格式化运行时间为 xx时xx分xx秒
hours=$((runtime / 3600))
minutes=$(((runtime % 3600) / 60))
seconds=$((runtime % 60))
# 输出提示信息
echo "视频已生成完毕" >> "$log_file"
echo "总计时间: $hours 时 $minutes 分 $seconds 秒" >> "$log_file"
# 输出日志信息
cat "$log_file"
在run.sh 中,顺序读取所有文档并将其依次转化为视频,最终将所有视频合并输出。您可以运行run.sh 脚本来实现上述过程,代码示例如下:
运行脚本后,您可以在./result.log 中查看代码运行日志。最终合并完成的视频output_merge_all_video.mp4 会被保存在./material/video 中,其内容可以参考效果演示 。
总结 通过本实践教程,您将能够:
了解如何综合运用大语言模型、多模态应用、Marp 等工具将一篇图文并茂的文档转化为更生动的讲解演示视频;
自行调整输入文档、Marp 风格文件、渲染素材等内容,个性化地打造具有您专属风格的视频。