OSS数据源为您提供读取和写入OSS的双向通道,本文为您介绍DataWorks的OSS数据同步的能力支持情况。
支持的字段类型与使用限制
离线读
OSS Reader实现了从OSS读取数据并转为数据集成协议的功能,OSS本身是无结构化数据存储。对于数据集成而言,OSS Reader支持的功能如下。
支持 | 不支持 |
|
|
准备OSS数据时,如果数据为CSV文件,则必须为标准格式的CSV文件。例如,如果列内容在半角引号(")内,需要替换成两个半角引号(""),否则会造成文件被错误分割。
离线写
OSS Writer实现了从数据同步协议转为OSS中的文本文件功能,OSS本身是无结构化数据存储,目前OSS Writer支持的功能如下。
支持 | 不支持 |
|
|
类型分类 | 数据集成column配置类型 |
整数类 | LONG |
字符串类 | STRING |
浮点类 | DOUBLE |
布尔类 | BOOLEAN |
日期时间类 | DATE |
实时写
支持实时写入的能力。
支持实时写入Hudi格式版本:0.12.x。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表实时同步任务配置指导
操作流程请参见配置单表增量数据实时同步、DataStudio侧实时同步任务配置。
整库(实时)全增量同步配置指导
操作流程请参见数据集成侧同步任务配置。
常见问题
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo:通用示例
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//定义可以表示为null的字符串。
"compress":"",//文本压缩类型。
"datasource":"",//数据源。
"column":[//字段。
{
"index":0,//列序号。
"type":"string"//数据类型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", //时间格式。
"index":4,
"type":"date"
}
],
"skipHeader":"",//类CSV格式文件可能存在表头为标题情况,需要跳过。
"encoding":"",//编码格式。
"fieldDelimiter":",",//字段分隔符。
"fileFormat": "",//文本类型。
"object":[]//object前缀。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1 //作业并发数。
"mbps":"12",//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本Demo:ORC或Parquet文件读取OSS
目前通过复用HDFS Reader的方式完成OSS读取ORC或Parquet格式的文件,在OSS Reader已有参数的基础上,增加了Path、FileFormat等扩展配置参数,参数含义请参见HDFS Reader。
以ORC文件格式读取OSS,示例如下。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }
以Parquet文件格式读取OSS,示例如下。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 | 是 | 无 |
Object | OSS的Object信息,此处可以支持填写多个Object。例如xxx的bucket中有yunshi文件夹,文件夹中有ll.txt文件,则Object直接填yunshi/ll.txt。 支持使用调度参数并配合调度,灵活生成Object文件名称与路径。
说明
| 是 | 无 |
parquetSchema | 以Parquet文件格式读取OSS时配置,当且仅当fileFormat为parquet时生效,具体表示parquet存储的类型说明。您需要确保填写parquetSchema后,整体配置符合JSON语法。
parquetSchema的配置格式说明如下:
配置示例如下所示。
| 否 | 无 |
column | 读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不是从源头文件读取数据,而是根据value值自动生成对应的列。 默认情况下,您可以全部按照String类型读取数据,配置如下。
您可以指定column字段信息,配置如下。
说明 对于您指定的column信息,type必须填写,index/value必须选择其一。 | 是 | 全部按照STRING类型读取。 |
fileFormat | 文本类型。源头OSS的文件类型。例如csv、text,两种格式均支持自定义分隔符。 | 是 | csv |
fieldDelimiter | 读取的字段分隔符。 说明 OSS Reader在读取数据时,需要指定字段分割符,如果不指定默认为(,),界面配置中也会默认填写为(,)。 如果分隔符不可见,请填写Unicode编码。例如,\u001b、\u007c。 | 是 | , |
lineDelimiter | 读取的行分隔符。 说明 当fileFormat取值为text时,本参数有效。 | 否 | 无 |
compress | 文本压缩类型,默认不填写(即不压缩)。支持压缩类型为gzip、bzip2和zip。 | 否 | 不压缩 |
encoding | 读取文件的编码配置。 | 否 | utf-8 |
nullFormat | 文本文件中无法使用标准字符串定义null(空指针),数据同步提供nullFormat定义哪些字符串可以表示为null。 例如:
| 否 | 无 |
skipHeader | 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过,压缩文件模式下不支持skipHeader。 | 否 | false |
csvReaderConfig | 读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。 | 否 | 无 |
Writer脚本Demo:通用示例
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//数据同步系统提供nullFormat,定义哪些字符串可以表示为null。
"dateFormat":"",//日期格式。
"datasource":"",//数据源。
"writeMode":"",//写入模式。
"writeSingleObject":"false", //表示是否将同步数据写入单个oss文件。
"encoding":"",//编码格式。
"fieldDelimiter":","//字段分隔符。
"fileFormat":"",//文本类型。
"object":""//Object前缀。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer脚本Demo:ORC或Parquet文件写入OSS脚本配置demo
目前通过复用HDFS Writer的方式完成OSS写ORC或Parquet格式的文件。在OSS Writer已有参数的基础上,增加了Path、FileFormat等扩展配置参数,参数含义请参见HDFS Writer。
ORC或Parquet文件写入OSS的示例如下:
以下仅为示例,请根据您自己具体的列名称和类型修改对应的参数,请勿直接复制使用。
以ORC文件格式写入OSS
写ORC文件,当前仅支持脚本模式,您需要转脚本模式配置,其中fileFormat需要配置为
orc
,path需要配置为写入文件的路径,column配置格式为{"name":"your column name","type": "your column type"}
。当前支持写入的ORC类型如下:
字段类型
离线写OSS(ORC格式)
TINYINT
支持
SMALLINT
支持
INT
支持
BIGINT
支持
FLOAT
支持
DOUBLE
支持
TIMESTAMP
支持
DATE
支持
VARCHAR
支持
STRING
支持
CHAR
支持
BOOLEAN
支持
DECIMAL
支持
BINARY
支持
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }
以Parquet文件格式写入OSS
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
object | OSS Writer写入的文件名,OSS使用文件名模拟目录的实现。OSS对于Object的名称有以下限制:
如果您不需要后缀随机UUID,建议您配置 | 是 | 无 |
writeMode | OSS Writer写入前,数据的处理:
| 是 | 无 |
writeSingleObject | OSS写数据时,是否写单个文件:
说明 当写入ORC、parquet类型数据时,writeSingleObject参数不生效,即使用该参数无法在多并发场景下,写入单个ORC或parquet文件。若要写入单个文件,您可以将并发设置为1,但文件名会添加随机后缀,并且设置并发为1时,将影响同步任务的速度。 | 否 | false |
fileFormat | 文件写出的格式,支持以下几种格式:
| 否 | text |
compress | 写入OSS的数据文件的压缩格式(需使用脚本模式任务配置)。 说明 csv、text文本类型不支持压缩,parquet/orc文件支持gzip、snappy等压缩。 | 否 | 无 |
fieldDelimiter | 写入的字段分隔符。 | 否 | , |
encoding | 写出文件的编码配置。 | 否 | utf-8 |
parquetSchema | 以Parquet文件格式写入OSS的必填项,用来描述目标文件的结构,所以此项当且仅当fileFormat为parquet时生效,格式如下。
配置项说明如下:
说明 每行列设置必须以分号结尾,最后一行也要写上分号。 示例如下。
| 否 | 无 |
nullFormat | 文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义可以表示为null的字符串。例如,您配置 | 否 | 无 |
header | OSS写出时的表头,例如, | 否 | 无 |
maxFileSize(高级配置,向导模式不支持) | OSS写出时单个Object文件的最大值,默认为10,000*10MB,类似于在打印log4j日志时,控制日志文件的大小。OSS分块上传时,每个分块大小为10MB(也是日志轮转文件最小粒度,即小于10MB的maxFileSize会被作为10MB),每个OSS InitiateMultipartUploadRequest支持的分块最大数量为10,000。 轮转发生时,Object名字规则是在原有Object前缀加UUID随机数的基础上,拼接_1,_2,_3等后缀。 说明 默认单位为MB。 配置示例:"maxFileSize":300, 表示设置单个文件大小为300M。 | 否 | 100,000 |
suffix(高级配置,向导模式不支持) | 数据同步写出时,生成的文件名后缀。例如,配置suffix为.csv,则最终写出的文件名为fileName****.csv。 | 否 | 无 |
附录:parquet类型数据的转化策略
如果您没有配置parquetSchema,那么DataWorks侧会根据源端字段类型,按照一定的策略进行相应数据类型转换,转换策略如下。
转换后的数据类型 | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | 不涉及 |
BINARY / VARBINARY | BINARY | 不涉及 |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | 不涉及 |
BIGINT | INT64 | 不涉及 |
FLOAT | FLOAT | 不涉及 |
DOUBLE | DOUBLE | 不涉及 |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | 不涉及 |