本文为您介绍如何使用DataWorks的PyODPS类型节点,借助开源结巴中文分词包实现对中文字段的分词并写入新的表,以及如何通过闭包函数使用自定义词典进行分词。
前提条件
- 请首先确保您已经完成DataWorks工作空间的创建,本示例使用绑定多个MaxCompute计算引擎的简单模式工作空间,详情请参见创建工作空间。
- 请在GitHub下载开源结巴分词中文包。

背景信息
PyODPS集成了MaxCompute的Python SDK。您可以在DataWorks的PyODPS节点上直接编辑Python代码,并使用MaxCompute的Python
SDK。关于PyODPS节点的详情,请参见PyODPS节点。
本文主要为您介绍如何使用PyODPS节点实现结巴中文分词。
注意 本文的操作仅作为代码示例,不建议用于实际的生产环境。
借助开源包实现结巴中文分词
- 创建业务流程。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 选择工作空间所在地域后,单击相应工作空间后的进入数据开发。
- 鼠标悬停至
图标,单击业务流程。
- 在新建业务流程对话框中,输入业务名称和描述,单击新建。
注意 业务名称必须是大小写字母、中文、数字、下划线(_)以及小数点(.),且不能超过128个字符。
- 上传jieba-master.zip包。
- 展开新建的业务流程下的MaxCompute,右键单击资源,选择。
- 在新建资源对话框中,配置各项参数后,单击新建。

参数 |
描述 |
引擎类型 |
从下拉列表中选择该资源所在的计算引擎。
说明 如果您所在的工作空间仅绑定一个实例,则不会显示该参数。
|
引擎实例 |
任务所绑定的MaxCompute引擎名称。 |
目标文件夹 |
默认当前所在文件夹的路径,您可以进行修改。 |
文件类型 |
此处选择Archive类型。
说明 如果该资源包已经在MaxCompute(ODPS)客户端上传过,请取消勾选上传为ODPS资源,否则上传会报错。
|
上传文件 |
单击点击上传,在本地选择已下载的文件jieba-master.zip后,单击打开。
|
资源名称 |
资源的名称,无需和上传的文件名保持一致,但需要符合以下规范:
- 资源名称仅包含中文、字母、数字、英文句号(.)、下划线(_)和短划线(-)。
- 资源类型为Archive时,资源名称和文件名的后缀必须一致,且后缀名包含.zip、.tgz、.tar.gz或.tar。
|
- 在工具栏中,单击
图标。
- 在提交新版本对话框中,输入变更描述,单击确定。
- 创建测试数据表。
- 展开新建的业务流程下的MaxCompute,右键选择。
- 在新建表对话框中,输入表名,单击新建。
- 单击DDL模式,输入以下建表DDL语句。
CREATE TABLE jieba_test (
`chinese` string,
`content` string
);
说明 本教程准备了两列测试数据,您在后续开发过程中可以选择一列进行分词。
- 在弹出的对话框中,单击确定。
- 在基本属性区域,输入表的中文名,单击提交到生产环境。
- 在提交生产确认对话框中,选中我已悉知风险,确认提交,单击确认。
- 以同样的方式创建存放测试结果的数据表jieba_result,DDL语句如下所示。
CREATE TABLE jieba_result (
`chinese` string
);
说明 本例仅对测试数据的chinese列进行分词处理,因此结果表仅有一列。
- 单击测试数据下载分词测试数据。
- 上传测试数据:
- 在数据开发页面,单击
图标。
- 在数据导入向导对话框中,输入需要导入数据的测试表jieba_test并选中,单击下一步。
- 单击浏览,上传您下载至本地的jieba_test.csv文件,单击下一步。
- 选中按名称匹配,单击导入数据。
- 创建PyODPS 2节点。
- 展开业务流程中的MaxCompute,右键单击数据开发,选择。
- 在新建节点对话框中,输入节点名称,并选择目标文件夹,单击提交。
说明
- 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
- 本示例的节点名称为word_split。
- 在word_split节点,输入下述PyODPS代码。
def test(input_var):
import jieba
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
result=jieba.cut(input_var, cut_all=False)
return "/ ".join(result)
hints = {
'odps.isolation.session.enable': True
}
libraries =['jieba-master.zip'] #引用您的jieba-master.zip压缩包。
iris = o.get_table('jieba_test').to_df() #引用您的jieba_test表中的数据。
example = iris.chinese.map(test).execute(hints=hints, libraries=libraries)
print(example) #查看分词结果,分词结构为MAP类型数据。
abci=list(example) #将分词结果转为list类型的数据。
i = 0
for i in range(i,len(abci)):
pq=str(abci[i])
o.write_table('jieba_result',[pq]) #通过循环,逐条写入数据至结果表jieba_result中。
i+=1
else:
print("done")
- 单击工具栏中的
图标。
- 单击工具栏中的
图标,在参数对话框中,从调度资源组下拉列表选择需要使用的资源组,单击确定。
- 在页面下方的运行日志区域,查看结巴分词程序的运行结果。
- 创建和运行ODPS SQL节点。
- 展开业务流程中的MaxCompute,右键单击数据开发,选择。
- 在新建节点对话框中,输入节点名称,并选择目标文件夹,单击提交。
说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
- 在节点的编辑页面,输入以下SQL语句。
select * from jieba_result;
- 在MaxCompute计算成本估计对话框中,确认预估费用后,单击运行。
- 在页面下方的运行日志区域,查看运行结果。
自定义的词典实现结巴分词
如果开源结巴分词的词库无法满足您的需求,您可以选择使用自定义的词典。
PyODPS自定义函数可以读取上传至MaxCompute的资源(表资源或文件资源)。此时,自定义函数需要写为闭包函数或Callable类。如果您需要引用复杂的自定义函数,则可以使用DataWorks的注册MaxCompute函数功能,详情请参见注册MaxCompute函数。
本文以使用闭包函数的方式,引用上传至MaxCompute的资源文件(即自定义词典)
key_words.txt。
说明 本示例的资源文件名为key_words.txt。
- 展开业务流程中的MaxCompute,右键单击资源,选择。
- 在新建资源对话框中,配置各项参数,单击确定。

参数 |
描述 |
引擎类型 |
从下拉列表中选择该资源所在的计算引擎。
说明 如果您所在的工作空间仅绑定一个实例,则不会显示该参数。
|
引擎实例 |
任务所绑定的MaxCompute引擎名称。 |
目标文件夹 |
默认当前所在文件夹的路径,您可以进行修改。 |
文件类型 |
此处选择File类型。
说明 如果您从本地上传词典文件至DataWorks,则文件必须为UTF-8编码格式。
|
上传文件 |
单击点击上传,在本地选择文件key_words.txt后,单击打开。
|
资源名称 |
资源名称仅包含中文、字母、数字、英文句号(.)、下划线(_)和短划线(-)。 |
- 在key_words.txt资源编辑页面,输入自定义词典的内容。格式如下。
- 一个词占一行。
- 每一行包括词、词频(可省略)和词性(可省略),使用空格分隔,且不可以颠倒顺序。
- 单击工具栏中的
图标提交资源。
- 创建一个PyODPS 2节点,并输入如下代码。
def test(resources):
import jieba
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
fileobj = resources[0]
def h(input_var):#在嵌套函数h()中,执行词典加载和分词。
import jieba
jieba.load_userdict(fileobj)
result=jieba.cut(input_var, cut_all=False)
return "/ ".join(result)
return h
hints = {
'odps.isolation.session.enable': True
}
libraries =['jieba-master.zip'] #引用您的jieba-master.zip压缩包。
iris = o.get_table('jieba_test').to_df() #引用您的jieba_test表中的数据。
file_object = o.get_resource('key_words.txt') #get_resource()引用odps资源。
example = iris.chinese.map(test, resources=[file_object]).execute(hints=hints, libraries=libraries) #map调用函数,并传递resources参数。
print(example) #查看分词结果,分词结构为MAP类型数据。
abci=list(example) #将分词结果转为list类型数据。
for i in range(i,len(abci)):
pq=str(abci[i])
o.write_table('jieba_result',[pq]) #通过循环,逐条写入数据至结果表jieba_result中。
i+=1
else:
print("done")
- 运行代码,对比引用自定义词典前后的结果。
引用自定义词典前结果如下。

引用自定义词典后结果如下。
