本文为您介绍OSS Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。
背景信息
OSS Writer插件为您提供向OSS写入类CSV格式的一个或多个表文件的功能,写入的文件个数和您的任务并发及同步的文件数有关。
说明 开始配置OSS Writer插件前,请首先配置好数据源,详情请参见配置OSS数据源。
写入OSS中的是一张逻辑意义上的二维表,例如CSV格式的文本信息。如果您想对OSS产品有更深入的了解,请参见OSS产品概述。
OSS Java SDK的详细介绍,请参见阿里云OSS Java SDK。
OSS Writer实现了从数据同步协议转为OSS中的文本文件功能,OSS本身是无结构化数据存储,目前OSS Writer支持的功能如下:
- 支持且仅支持写入文本文件,并要求文本文件中的Schema为一张二维表。
- 支持类CSV格式文件,自定义分隔符。
- 支持多线程写入,每个线程写入不同的子文件。
- 文件支持滚动,当文件大于某个size值时,支持文件切换。
OSS Writer暂时不能实现以下功能:
- 单个文件不能支持并发写入。
- OSS本身不提供数据类型,OSS Writer均以STRING类型写入OSS对象。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
object | OSS Writer写入的文件名,OSS使用文件名模拟目录的实现。OSS对于Object的名称有以下限制:
如果您不需要后缀随机UUID,建议您配置 |
是 | 无 |
writeMode | OSS Writer写入前,数据的处理:
|
是 | 无 |
writeSingleObject | OSS写数据时,是否写单个文件:
|
否 | false |
fileFormat | 文件写出的格式,包括csv和text:
说明 支持写入parquet文件类型,若使用此文件类型,必须增加parquetschema参数定义数据类型。
|
否 | text |
compress | 写入OSS的数据文件的压缩格式(需使用脚本模式任务配置)。
说明 csv、text文本类型不支持压缩,parquet/orc文件支持zip、snappy等压缩。
|
否 | 无 |
fieldDelimiter | 写入的字段分隔符。 | 否 | , |
encoding | 写出文件的编码配置。 | 否 | utf-8 |
nullFormat | 文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义可以表示为null的字符串。例如,您配置nullFormat="null" ,如果源头数据是null ,数据同步系统会视作null字段。
|
否 | 无 |
header | OSS写出时的表头,例如,["id", "name", "age"] 。
|
否 | 无 |
maxFileSize(高级配置,向导模式不支持) | OSS写出时单个Object文件的最大值,默认为10,000*10MB,类似于在打印log4j日志时,控制日志文件的大小。OSS分块上传时,每个分块大小为10MB(也是日志轮转文件最小粒度,即小于10MB的maxFileSize会被作为10MB),每个OSS
InitiateMultipartUploadRequest支持的分块最大数量为10,000。
轮转发生时,Object名字规则是在原有Object前缀加UUID随机数的基础上,拼接_1,_2,_3等后缀。 |
否 | 100,000 说明 默认单位为MB。
配置示例:"maxFileSize":300, 表示设置单个文件大小为300M。 |
suffix(高级配置,向导模式不支持) | 数据同步写出时,生成的文件名后缀。例如,配置suffix为.csv,则最终写出的文件名为fileName****.csv。 | 否 | 无 |
向导开发介绍
- 选择数据源。
配置同步任务的数据来源和数据去向。
参数 描述 数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。 文件名(含路径) 即上述参数说明中的Object,填写OSS文件夹的路径,其中不要填写bucket的名称。 文本类型 包括csv、text和parquet。 列分隔符 即上述参数说明中的fieldDelimiter,默认值为(,)。 编码格式 即上述参数说明中的encoding,默认值为utf-8。 null值 即上述参数说明中的nullFormat,将要表示为空的字段填入文本框,如果源端存在则将对应的部分转换为空。 时间格式 日期类型的数据序列化到Object时的格式,例如 "dateFormat": "yyyy-MM-dd"
。前缀冲突 有同样的文件时,可以选择替换、保留或报错。 - 字段映射。左侧的源头表字段和右侧的目标表字段为一一对应的关系。
参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 - 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组概述和新增和使用独享数据集成资源组。
脚本开发介绍
脚本配置示例如下所示,使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//数据同步系统提供nullFormat,定义哪些字符串可以表示为null。
"dateFormat":"",//日期格式。
"datasource":"",//数据源。
"writeMode":"",//写入模式。
"writeSingleObject":"false", //表示是否将同步数据写入单个oss文件。
"encoding":"",//编码格式。
"fieldDelimiter":","//字段分隔符。
"fileFormat":"",//文本类型。
"object":""//Object前缀。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
ORC或Parquet文件写入OSS
目前通过复用HDFS Writer的方式完成OSS写ORC或Parquet格式的文件。在OSS Writer已有参数的基础上,增加了Path、FileFormat等扩展配置参数,参数含义请参见HDFS Writer。
ORC或Parquet文件写入OSS的示例如下:
注意 以下仅为示例,请根据您自己具体的列名称和类型修改对应的参数,请勿直接复制使用。
- 以ORC文件格式写入OSS。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }
- 以Parquet文件格式写入OSS,示例如下。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n repeated group list {\n required binary element (UTF8);\n }\n}\nrequired group params_struct {\n required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n repeated group list {\n required group element {\n required int64 id;\n required binary name (UTF8);\n}\n }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n required int64 id;\n required binary name (UTF8);\n }\n}\n}\nrequired group params_struct_complex {\n required int64 id;\n required group detail {\n required int64 id;\n required binary name (UTF8);\n }\n }\n}", "dataxParquetMode": "fields" } }