本文档介绍如何在DLC上使用开源工具DataJuicer进行大规模多模态数据处理。
DataJuicer
DataJuicer简介
DataJuicer 是一款处理大规模多模态数据(文本、图像、音视频)的开源工具库。它通过提供丰富的算子、高效的分布式处理以及灵活的配置,简化了数据清洗、转换和增强流程,专门用于优化大型语言模型(LLM)和多模态模型的训练数据准备。
DataJuicer任务配置
DataJuicer通过YAML配置文件来组织和管理整个数据处理流程。用户在一个配置文件中定义全局参数和一系列按顺序执行的算子。
以下是一个配置示例process.yaml
:
以配置文件路径作为参数来运行 process_data.py
或者 dj-process
命令行工具来处理数据集。
# 适用于从源码安装
python tools/process_data.py --config configs/demo/process.yaml
# 使用命令行工具
dj-process --config configs/demo/process.yaml
关键参数说明
dataset_path
:输入数据的路径。在DLC任务中,应设置为数据存储(如OSS)挂载到容器内的路径。export_path
:处理结果的输出路径。对于分布式任务,此路径必须是目录而非具体文件。executor_type
:执行器类型。default
表示使用DefaultExecutor
在单节点运行;ray
表示使用RayExecutor
。RayExecutor支持分布式处理,详情请参见Data-Juicer 分布式数据处理。
算子(Operator)
算子(Operator)是DataJuicer中执行具体数据处理任务的基础单元。DataJuicer系统化地为您提供了100 多个算子,如:aggregator、deduplicator、filter、formatter、grouper、mapper、selector等,详见请参见Operator Schemas。
您可以参考config_all.yaml,其中包含所有算子的配置。
相关参考资料
如何构建任务配置文件,详情请参见构建配置文件。
完整配置文件,详情请参见config_all.yaml。
更多详细信息请参见DataJuicer官方文档和DataJuicer Github。
在DLC上运行DataJuicer任务
DLC原生支持DataJuicer,您可以通过在创建任务时,框架选择DataJuicer来创建该类型任务。
运行模式
创建DLC任务时,您需要选择与配置文件中executor_type
相匹配的运行模式。
单节点模式:
DataJuicer配置文件:在配置文件中,
executor_type
应设置为default
或省略该字段。DLC配置:
运行模式:选择单节点;
节点数量:设置为1;
分布式模式:
DataJuicer配置文件:在配置文件中,
executor_type
必须设置为ray
。DLC配置:
运行模式:选择分布式;
节点数量:Head节点数必须为1,Worker节点数至少为1;
资源规格:Head资源规格需8G以上内存,Worker资源规格按需使用;
容错与诊断(可选):支持配置Head节点容错,用户可以选择同专有网络VPC下的Redis实例;
镜像要求
DataJuicer任务的镜像中要求预装DataJuicer环境,必须包含dj-process
命令。建议使用官方提供的data-juicer Repo的镜像,或者基于官方data-juicer镜像的自定义镜像。
启动命令
DLC支持Shell和YAML两种格式的启动命令,默认为Shell。当命令行格式为Shell时,使用方式和其他DLC任务一致。当命令行为YAML时,用户可以直接在命令行中填写DataJuicer的配置。
启动命令示例如下:
Shell格式命令示例1:将配置写到临时文件,使用
dj-process
命令启动。命令示例1
set -ex cat > /tmp/run_config.yaml <<EOL # Process config example for dataset # global parameters project_name: 'ray-demo' dataset_path: '/mnt/data/process_on_ray/data/demo-dataset2.jsonl' # path to your dataset directory or file export_path: '/mnt/data/data-juicer-outputs/20250728/01/process_on_ray/result.jsonl' executor_type: 'ray' ray_address: 'auto' # change to your ray cluster address, e.g., ray://<hostname>:<port> np: 12 # process schedule # a list of several process operators with their arguments process: # Filter ops - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.9 # the max ratio of filter range - average_line_length_filter: # filter text with the average length of lines out of specific range. min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - character_repetition_filter: # filter text with the character repetition ratio out of specific range rep_len: 10 # repetition length for char-level n-gram min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.5 # the max ratio of filter range - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value lang: en # consider flagged words in what language tokenization: false # whether to use model to tokenize documents max_ratio: 0.0045 # the max ratio to filter text flagged_words_dir: ./assets # directory to store flagged words dictionaries use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese words_aug_group_sizes: [2] # the group size of words to augment words_aug_join_char: "" # the join char between words to augment - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value lang: en # keep text in what language min_score: 0.8 # the min language scores to filter text - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - perplexity_filter: # filter text with perplexity score out of specific range lang: en # compute perplexity in what language max_ppl: 1500 # the max perplexity score to filter text - special_characters_filter: # filter text with special-char ratio out of specific range min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.25 # the max ratio of filter range - stopwords_filter: # filter text with stopword ratio smaller than a specific min value lang: en # consider stopwords in what language tokenization: false # whether to use model to tokenize documents min_ratio: 0.3 # the min ratio to filter text stopwords_dir: ./assets # directory to store stopwords dictionaries use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese words_aug_group_sizes: [2] # the group size of words to augment words_aug_join_char: "" # the join char between words to augment - text_length_filter: # filter text with length out of specific range min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - words_num_filter: # filter text with number of words out of specific range lang: en # sample in which language tokenization: false # whether to use model to tokenize documents min_num: 10 # the min number of filter range max_num: 10000 # the max number of filter range - word_repetition_filter: # filter text with the word repetition ratio out of specific range lang: en # sample in which language tokenization: false # whether to use model to tokenize documents rep_len: 10 # repetition length for word-level n-gram min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.5 # the max ratio of filter range EOL dj-process --config /tmp/run_config.yaml
Shell格式命令示例2:配置文件保存云存储中(如:对象存储OSS),挂载到DLC容器中,通过
dj-process
直接指定挂载后的配置文件运行。dj-process --config /mnt/data/process_on_ray/config/demo.yaml
YAML格式命令示例:命令中直接填写DataJuicer配置。
YAML格式命令示例
# Process config example for dataset # global parameters project_name: 'ray-demo' dataset_path: '/mnt/data/process_on_ray/data/demo-dataset2.jsonl' # path to your dataset directory or file export_path: '/mnt/data/data-juicer-outputs/20250728/01/process_on_ray/result.jsonl' executor_type: 'ray' ray_address: 'auto' # change to your ray cluster address, e.g., ray://<hostname>:<port> np: 12 # process schedule # a list of several process operators with their arguments process: # Filter ops - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.9 # the max ratio of filter range - average_line_length_filter: # filter text with the average length of lines out of specific range. min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - character_repetition_filter: # filter text with the character repetition ratio out of specific range rep_len: 10 # repetition length for char-level n-gram min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.5 # the max ratio of filter range - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value lang: en # consider flagged words in what language tokenization: false # whether to use model to tokenize documents max_ratio: 0.0045 # the max ratio to filter text flagged_words_dir: ./assets # directory to store flagged words dictionaries use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese words_aug_group_sizes: [2] # the group size of words to augment words_aug_join_char: "" # the join char between words to augment - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value lang: en # keep text in what language min_score: 0.8 # the min language scores to filter text - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - perplexity_filter: # filter text with perplexity score out of specific range lang: en # compute perplexity in what language max_ppl: 1500 # the max perplexity score to filter text - special_characters_filter: # filter text with special-char ratio out of specific range min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.25 # the max ratio of filter range - stopwords_filter: # filter text with stopword ratio smaller than a specific min value lang: en # consider stopwords in what language tokenization: false # whether to use model to tokenize documents min_ratio: 0.3 # the min ratio to filter text stopwords_dir: ./assets # directory to store stopwords dictionaries use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese words_aug_group_sizes: [2] # the group size of words to augment words_aug_join_char: "" # the join char between words to augment - text_length_filter: # filter text with length out of specific range min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - words_num_filter: # filter text with number of words out of specific range lang: en # sample in which language tokenization: false # whether to use model to tokenize documents min_num: 10 # the min number of filter range max_num: 10000 # the max number of filter range - word_repetition_filter: # filter text with the word repetition ratio out of specific range lang: en # sample in which language tokenization: false # whether to use model to tokenize documents rep_len: 10 # repetition length for word-level n-gram min_ratio: 0.0 # the min ratio of filter range max_ratio: 0.5 # the max ratio of filter range
算子依赖的模型
DataJuicer许多算子依赖外部模型。如果模型不在本地,首次运行时算子会自动下载,因此执行会耗时较长。
为避免重复下载,DataJuicer任务会自动处理默认模型。它会将默认模型挂载到 /ml/data-juicer/models/
目录,并设置 DATA_JUICER_EXTERNAL_MODELS_HOME
环境变量。因此,您无需手动下载默认模型。