OSS数据源为您提供读取和写入OSS的双向通道,本文为您介绍DataWorks的OSS数据同步的能力支持情况。
支持的字段类型与使用限制
离线读
OSS Reader实现了从OSS读取数据并转为数据集成协议的功能,OSS本身是无结构化数据存储。对于数据集成而言,OSS Reader支持的功能如下。
支持 | 不支持 |
|
|
准备OSS数据时,如果数据为CSV文件,则必须为标准格式的CSV文件。例如,如果列内容在半角引号(")内,需要替换成两个半角引号(""),否则会造成文件被错误分割。
OSS属于非结构化数据源,里面存放的都是文件类型数据,因此在使用同步时,需要先自行确认同步的字段结构是否符合预期。同理,非结构化数据源中数据结构发生变化时必须要在任务配置中重新确认字段结构,否则可能会造成同步数据错乱。
离线写
OSS Writer实现了从数据同步协议转为OSS中的文本文件功能,OSS本身是无结构化数据存储,目前OSS Writer支持的功能如下。
支持 | 不支持 |
|
|
类型分类 | 数据集成column配置类型 |
整数类 | LONG |
字符串类 | STRING |
浮点类 | DOUBLE |
布尔类 | BOOLEAN |
日期时间类 | DATE |
实时写
支持实时写入的能力。
支持实时写入Hudi格式版本:0.12.x。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
跨账号创建OSS数据源时需对相应的账号进行授权,详情可参见:基于Bucket Policy实现跨账号访问OSS。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表实时同步任务配置指导
操作流程请参见配置单表增量数据实时同步、DataStudio侧实时同步任务配置。
整库(实时)全增量同步配置指导
操作流程请参见数据集成侧同步任务配置。
常见问题
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo:通用示例
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//定义可以表示为null的字符串。
"compress":"",//文本压缩类型。
"datasource":"",//数据源。
"column":[//字段。
{
"index":0,//列序号。
"type":"string"//数据类型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", //时间格式。
"index":4,
"type":"date"
}
],
"skipHeader":"",//类CSV格式文件可能存在表头为标题情况,需要跳过。
"encoding":"",//编码格式。
"fieldDelimiter":",",//字段分隔符。
"fileFormat": "",//文本类型。
"object":[]//object前缀。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1 //作业并发数。
"mbps":"12",//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本Demo:ORC或Parquet文件读取OSS
目前通过复用HDFS Reader的方式完成OSS读取ORC或Parquet格式的文件,在OSS Reader已有参数的基础上,增加了Path、FileFormat等扩展配置参数,参数含义请参见HDFS Reader。
以ORC文件格式读取OSS,示例如下。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }
以Parquet文件格式读取OSS,示例如下。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 | 是 | 无 |
Object | OSS的Object信息,此处可以支持填写多个Object。例如xxx的bucket中有yunshi文件夹,文件夹中有ll.txt文件,则Object直接填yunshi/ll.txt。 支持使用调度参数并配合调度,灵活生成Object文件名称与路径。
说明
| 是 | 无 |
parquetSchema | 以Parquet文件格式读取OSS时配置,当且仅当fileFormat为parquet时生效,具体表示parquet存储的类型说明。您需要确保填写parquetSchema后,整体配置符合JSON语法。
parquetSchema的配置格式说明如下:
配置示例如下所示。
| 否 | 无 |
column | 读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不是从源头文件读取数据,而是根据value值自动生成对应的列。 默认情况下,您可以全部按照String类型读取数据,配置如下。
您可以指定column字段信息,配置如下。
说明 对于您指定的column信息,type必须填写,index/value必须选择其一。 | 是 | 全部按照STRING类型读取。 |
fileFormat | 文本类型。源头OSS的文件类型。例如csv、text,两种格式均支持自定义分隔符。 | 是 | csv |
fieldDelimiter | 读取的字段分隔符。 说明 OSS Reader在读取数据时,需要指定字段分割符,如果不指定默认为(,),界面配置中也会默认填写为(,)。 如果分隔符不可见,请填写Unicode编码。例如,\u001b、\u007c。 | 是 | , |
lineDelimiter | 读取的行分隔符。 说明 当fileFormat取值为text时,本参数有效。 | 否 | 无 |
compress | 文本压缩类型,默认不填写(即不压缩)。支持压缩类型为gzip、bzip2和zip。 | 否 | 不压缩 |
encoding | 读取文件的编码配置。 | 否 | utf-8 |
nullFormat | 文本文件中无法使用标准字符串定义null(空指针),数据同步提供nullFormat定义哪些字符串可以表示为null。 例如:
| 否 | 无 |
skipHeader | 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过,压缩文件模式下不支持skipHeader。 | 否 | false |
csvReaderConfig | 读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。 | 否 | 无 |
Writer脚本Demo:通用示例
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//数据同步系统提供nullFormat,定义哪些字符串可以表示为null。
"dateFormat":"",//日期格式。
"datasource":"",//数据源。
"writeMode":"",//写入模式。
"writeSingleObject":"false", //表示是否将同步数据写入单个oss文件。
"encoding":"",//编码格式。
"fieldDelimiter":","//字段分隔符。
"fileFormat":"",//文本类型。
"object":""//Object前缀。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer脚本Demo:ORC或Parquet文件写入OSS脚本配置demo
目前通过复用HDFS Writer的方式完成OSS写ORC或Parquet格式的文件。在OSS Writer已有参数的基础上,增加了Path、FileFormat等扩展配置参数,参数含义请参见HDFS Writer。
ORC或Parquet文件写入OSS的示例如下:
以下仅为示例,请根据您自己具体的列名称和类型修改对应的参数,请勿直接复制使用。
以ORC文件格式写入OSS
写ORC文件,当前仅支持脚本模式,您需要转脚本模式配置,其中fileFormat需要配置为
orc
,path需要配置为写入文件的路径,column配置格式为{"name":"your column name","type": "your column type"}
。当前支持写入的ORC类型如下:
字段类型
离线写OSS(ORC格式)
TINYINT
支持
SMALLINT
支持
INT
支持
BIGINT
支持
FLOAT
支持
DOUBLE
支持
TIMESTAMP
支持
DATE
支持
VARCHAR
支持
STRING
支持
CHAR
支持
BOOLEAN
支持
DECIMAL
支持
BINARY
支持
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }
以Parquet文件格式写入OSS
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
object | OSS Writer写入的文件名,OSS使用文件名模拟目录的实现。OSS对于Object的名称有以下限制:
如果您不需要后缀随机UUID,建议您配置 | 是 | 无 |
writeMode | OSS Writer写入前,数据的处理:
| 是 | 无 |
writeSingleObject | OSS写数据时,是否写单个文件:
说明 当写入ORC、parquet类型数据时,writeSingleObject参数不生效,即使用该参数无法在多并发场景下,写入单个ORC或parquet文件。若要写入单个文件,您可以将并发设置为1,但文件名会添加随机后缀,并且设置并发为1时,将影响同步任务的速度。 | 否 | false |
fileFormat | 文件写出的格式,支持以下几种格式:
| 否 | text |
compress | 写入OSS的数据文件的压缩格式(需使用脚本模式任务配置)。 说明 csv、text文本类型不支持压缩,parquet/orc文件支持gzip、snappy等压缩。 | 否 | 无 |
fieldDelimiter | 写入的字段分隔符。 | 否 | , |
encoding | 写出文件的编码配置。 | 否 | utf-8 |
parquetSchema | 以Parquet文件格式写入OSS的必填项,用来描述目标文件的结构,所以此项当且仅当fileFormat为parquet时生效,格式如下。
配置项说明如下:
说明 每行列设置必须以分号结尾,最后一行也要写上分号。 示例如下。
| 否 | 无 |
nullFormat | 文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义可以表示为null的字符串。例如,您配置 | 否 | 无 |
header | OSS写出时的表头,例如, | 否 | 无 |
maxFileSize(高级配置,向导模式不支持) | OSS写出时单个Object文件的最大值,默认为10,000*10MB,类似于在打印log4j日志时,控制日志文件的大小。OSS分块上传时,每个分块大小为10MB(也是日志轮转文件最小粒度,即小于10MB的maxFileSize会被作为10MB),每个OSS InitiateMultipartUploadRequest支持的分块最大数量为10,000。 轮转发生时,Object名字规则是在原有Object前缀加UUID随机数的基础上,拼接_1,_2,_3等后缀。 说明 默认单位为MB。 配置示例:"maxFileSize":300, 表示设置单个文件大小为300M。 | 否 | 100,000 |
suffix(高级配置,向导模式不支持) | 数据同步写出时,生成的文件名后缀。例如,配置suffix为.csv,则最终写出的文件名为fileName****.csv。 | 否 | 无 |
附录:parquet类型数据的转化策略
如果您没有配置parquetSchema,那么DataWorks侧会根据源端字段类型,按照一定的策略进行相应数据类型转换,转换策略如下。
转换后的数据类型 | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | 不涉及 |
BINARY / VARBINARY | BINARY | 不涉及 |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | 不涉及 |
BIGINT | INT64 | 不涉及 |
FLOAT | FLOAT | 不涉及 |
DOUBLE | DOUBLE | 不涉及 |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | 不涉及 |