OSS数据源

OSS数据源为您提供读取和写入OSS的双向通道,本文为您介绍DataWorks的OSS数据同步的能力支持情况。

支持的字段类型与使用限制

离线读

OSS Reader实现了从OSS读取数据并转为数据集成协议的功能,OSS本身是无结构化数据存储。对于数据集成而言,OSS Reader支持的功能如下。

支持

不支持

  • 支持且仅支持读取TXT格式的文件,且要求TXT中schema为一张二维表。

  • 支持类CSV格式文件,自定义分隔符。

  • 支持ORC、PARQUET格式.

  • 支持多种类型数据读取(使用String表示),支持列裁剪、列常量。

  • 支持递归读取、支持文件名过滤。

  • 支持文本压缩,现有压缩格式为gzipbzip2zip

    说明

    一个压缩包不允许多文件打包压缩。

  • 多个Object可以支持并发读取。

  • 单个Object(File)不支持多线程并发读取。

  • 单个Object在压缩情况下,从技术上无法支持多线程并发读取。

重要

准备OSS数据时,如果数据为CSV文件,则必须为标准格式的CSV文件。例如,如果列内容在半角引号(")内,需要替换成两个半角引号(""),否则会造成文件被错误分割。

离线写

OSS Writer实现了从数据同步协议转为OSS中的文本文件功能,OSS本身是无结构化数据存储,目前OSS Writer支持的功能如下。

支持

不支持

  • 支持且仅支持写入文本类型(不支持BLOB,如视频和图片)的文件,并要求文本文件中的Schema为一张二维表。

  • 支持类CSV格式文件,自定义分隔符。

  • 支持ORC、PARQUET格式。

  • 支持多线程写入,每个线程写入不同的子文件。

  • 文件支持滚动,当文件大于某个size值时,支持文件切换。

  • 单个文件不能支持并发写入。

  • OSS本身不提供数据类型,OSS Writer均以STRING类型写入OSS对象。

  • 如果OSS的Bucket存储类型为冷归档存储,则不支持写入。

  • 单个Object(File)不超过100 GB。

类型分类

数据集成column配置类型

整数类

LONG

字符串类

STRING

浮点类

DOUBLE

布尔类

BOOLEAN

日期时间类

DATE

实时写

  • 支持实时写入的能力。

  • 支持实时写入Hudi格式版本:0.12.x。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表实时同步任务配置指导

操作流程请参见配置单表增量数据实时同步DataStudio侧实时同步任务配置

整库(实时)全增量同步配置指导

操作流程请参见数据集成侧同步任务配置

常见问题

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo:通用示例

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"oss",//插件名。
            "parameter":{
                "nullFormat":"",//定义可以表示为null的字符串。
                "compress":"",//文本压缩类型。
                "datasource":"",//数据源。
                "column":[//字段。
                    {
                        "index":0,//列序号。
                        "type":"string"//数据类型。
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", //时间格式。
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"",//类CSV格式文件可能存在表头为标题情况,需要跳过。
                "encoding":"",//编码格式。
                "fieldDelimiter":",",//字段分隔符。
                "fileFormat": "",//文本类型。
                "object":[]//object前缀。
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader脚本Demo:ORC或Parquet文件读取OSS

目前通过复用HDFS Reader的方式完成OSS读取ORC或Parquet格式的文件,在OSS Reader已有参数的基础上,增加了PathFileFormat等扩展配置参数,参数含义请参见HDFS Reader

  • 以ORC文件格式读取OSS,示例如下。

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
    "column": [
    {
    "index": 0,
    "type": "long"
    },
    {
    "index": "1",
    "type": "string"
    },
    {
    "index": "2",
    "type": "string"
    }
    ]
    }
    }
  • 以Parquet文件格式读取OSS,示例如下。

    {
      "type":"job",
        "version":"2.0",
        "steps":[
        {
          "stepType":"oss",
          "parameter":{
            "nullFormat":"",
            "compress":"",
            "fileFormat":"parquet",
            "path":"/*",
            "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }",
            "column":[
              {
                "index":"0",
                "type":"string"
              },
              {
                "index":"1",
                "type":"long"
              },
              {
                "index":"2",
                "type":"string"
              },
              {
                "index":"3",
                "type":"string"
              },
              {
                "index":"4",
                "type":"string"
              },
              {
                "index":"5",
                "type":"string"
              },
              {
                "index":"6",
                "type":"string"
              },
              {
                "index":"7",
                "type":"string"
              },
              {
                "index":"8",
                "type":"string"
              },
              {
                "index":"9",
                "type":"string"
              },
              {
                "index":"10",
                "type":"double"
              },
              {
                "index":"11",
                "type":"string"
              },
              {
                "index":"12",
                "type":"string"
              }
            ],
            "skipHeader":"false",
            "encoding":"UTF-8",
            "fieldDelimiter":",",
            "fieldDelimiterOrigin":",",
            "datasource":"wpw_demotest_oss",
            "envType":0,
            "object":[
              "wpw_demo/userdata1.parquet"
            ]
          },
          "name":"Reader",
          "category":"reader"
        },
        {
          "stepType":"odps",
          "parameter":{
            "partition":"dt=${bizdate}",
            "truncate":true,
            "datasource":"0_odps_wpw_demotest",
            "envType":0,
            "column":[
              "id"
            ],
            "emptyAsNull":false,
            "table":"wpw_0827"
          },
          "name":"Writer",
          "category":"writer"
        }
      ],
        "setting":{
        "errorLimit":{
          "record":""
        },
        "locale":"zh_CN",
          "speed":{
          "throttle":false,
            "concurrent":2
        }
      },
      "order":{
        "hops":[
          {
            "from":"Reader",
            "to":"Writer"
          }
        ]
      }
    }

Reader脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

Object

OSS的Object信息,此处可以支持填写多个Object。例如xxx的bucket中有yunshi文件夹,文件夹中有ll.txt文件,则Object直接填yunshi/ll.txt。 支持使用调度参数并配合调度,灵活生成Object文件名称与路径。

  • 当指定单个OSS Object时,OSS Reader暂时只能使用单线程进行数据抽取。后期将考虑在非压缩文件情况下针对单个Object可以进行多线程并发读取。

  • 当指定多个OSS Object时,OSS Reader支持使用多线程进行数据抽取。可以根据具体要求配置线程并发数。

  • 当指定通配符时,OSS Reader尝试遍历出多个Object信息。例如配置为abc*[0-9]时,可以匹配到abc0abc1abc2abc3等;配置为abc?.txt时,可以匹配到以abc开头、 .txt结尾、中间有1个任意字符的文件。

    配置通配符会导致内存溢出,通常不建议您进行配置。详情请参见OSS产品概述

说明
  • 数据同步系统会将一个作业下同步的所有Object视作同一张数据表。您必须保证所有的Object能够适配同一套Schema信息。

  • 请注意控制单个目录下的文件个数,否则可能会触发系统OutOfMemoryError报错。如果遇到此情况,请将文件拆分到不同目录后再尝试进行同步。

parquetSchema

以Parquet文件格式读取OSS时配置,当且仅当fileFormatparquet时生效,具体表示parquet存储的类型说明。您需要确保填写parquetSchema后,整体配置符合JSON语法。

message MessageType名 {
是否必填, 数据类型, 列名;
......................;
}

parquetSchema的配置格式说明如下:

  • MessageType名:填写名称。

  • 是否必填:required表示非空,optional表示可为空。推荐全填optional。

  • 数据类型:Parquet文件支持BOOLEAN、Int32、Int64、Int96、FLOAT、DOUBLE、BINARY(如果是字符串类型,请填BINARY)和fixed_len_byte_array类型。

  • 每行列设置必须以分号结尾,最后一行也要写上分号。

配置示例如下所示。

"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"

column

读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不是从源头文件读取数据,而是根据value值自动生成对应的列。

默认情况下,您可以全部按照String类型读取数据,配置如下。

"column": ["*"]

您可以指定column字段信息,配置如下。

"column":
    {
       "type": "long",
       "index": 0    //从OSS文本第一列获取int字段。
    },
    {
       "type": "string",
       "value": "alibaba"  //从OSSReader内部生成alibaba的字符串字段作为当前字段。
    }
说明

对于您指定的column信息,type必须填写,index/value必须选择其一。

全部按照STRING类型读取。

fileFormat

文本类型。源头OSS的文件类型。例如csv、text,两种格式均支持自定义分隔符。

csv

fieldDelimiter

读取的字段分隔符。

说明

OSS Reader在读取数据时,需要指定字段分割符,如果不指定默认为(,),界面配置中也会默认填写为(,)。

如果分隔符不可见,请填写Unicode编码。例如,\u001b\u007c

,

lineDelimiter

读取的行分隔符。

说明

当fileFormat取值为text时,本参数有效。

compress

文本压缩类型,默认不填写(即不压缩)。支持压缩类型为gzipbzip2zip

不压缩

encoding

读取文件的编码配置。

utf-8

nullFormat

文本文件中无法使用标准字符串定义null(空指针),数据同步提供nullFormat定义哪些字符串可以表示为null。 例如:

  • 配置nullFormat:"null",等同于“可见字符”,如果源头数据是null,则数据同步视作null字段。

  • 配置nullFormat:"\u0001",等同于“不可见字符”,如果源头数据是字符串"\u0001",则数据同步视作null字段。

  • 不写"nullFormat"这个参数,等同于“未配置”,代表来源是什么数据就直接按照什么数据写入目标端,不做任何转换。

skipHeader

类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过,压缩文件模式下不支持skipHeader

false

csvReaderConfig

读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。

Writer脚本Demo:通用示例

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"oss",//插件名。
            "parameter":{
                "nullFormat":"",//数据同步系统提供nullFormat,定义哪些字符串可以表示为null。
                "dateFormat":"",//日期格式。
                "datasource":"",//数据源。
                "writeMode":"",//写入模式。
                "writeSingleObject":"false", //表示是否将同步数据写入单个oss文件。
                "encoding":"",//编码格式。
                "fieldDelimiter":","//字段分隔符。
                "fileFormat":"",//文本类型。
                "object":""//Object前缀。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer脚本Demo:ORC或Parquet文件写入OSS脚本配置demo

目前通过复用HDFS Writer的方式完成OSS写ORC或Parquet格式的文件。在OSS Writer已有参数的基础上,增加了PathFileFormat等扩展配置参数,参数含义请参见HDFS Writer

ORC或Parquet文件写入OSS的示例如下:

重要

以下仅为示例,请根据您自己具体的列名称和类型修改对应的参数,请勿直接复制使用。

  • 以ORC文件格式写入OSS

    写ORC文件,当前仅支持脚本模式,您需要转脚本模式配置,其中fileFormat需要配置为orcpath需要配置为写入文件的路径,column配置格式为 {"name":"your column name","type": "your column type"}

    当前支持写入的ORC类型如下:

    字段类型

    离线写OSS(ORC格式)

    TINYINT

    支持

    SMALLINT

    支持

    INT

    支持

    BIGINT

    支持

    FLOAT

    支持

    DOUBLE

    支持

    TIMESTAMP

    支持

    DATE

    支持

    VARCHAR

    支持

    STRING

    支持

    CHAR

    支持

    BOOLEAN

    支持

    DECIMAL

    支持

    BINARY

    支持

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61",
    "fileName": "orc",
    "writeMode": "append",
    "column": [
    {
    "name": "col1",
    "type": "BIGINT"
    },
    {
    "name": "col2",
    "type": "DOUBLE"
    },
    {
    "name": "col3",
    "type": "STRING"
    }
    ],
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "NONE",
    "encoding": "UTF-8"
    }
    }
  • 以Parquet文件格式写入OSS

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "parquet",
    "path": "/tests/case61",
    "fileName": "test",
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "SNAPPY",
    "encoding": "UTF-8",
    "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}",
    "dataxParquetMode": "fields"
    }
    }

Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。

object

OSS Writer写入的文件名,OSS使用文件名模拟目录的实现。OSS对于Object的名称有以下限制:

  • 使用"object": "datax",写入的Object以datax开头,后缀添加随机字符串。

  • 使用"object": "cdo/datax",写入的Object以/cdo/datax开头,后缀随机添加字符串,OSS模拟目录的分隔符为(/)。

如果您不需要后缀随机UUID,建议您配置"writeSingleObject" : "true",详情请参见writeSingleObject说明。

writeMode

OSS Writer写入前,数据的处理:

  • truncate:写入前清理Object名称前缀匹配的所有Object。例如"object":"abc",将清理所有abc开头的Object。

  • append:写入前不进行任何处理,数据集成OSS Writer直接使用Object名称写入,并使用随机UUID的后缀名来保证文件名不冲突。例如您指定的Object名为数据集成,实际写入为DI_****_****_****

  • nonConflict:如果指定路径出现前缀匹配的Object,直接报错。例如"object":"abc",如果存在abc123的Object,将直接报错。

writeSingleObject

OSS写数据时,是否写单个文件:

  • true:表示写单个文件,当读不到任何数据时, 不会产生空文件。

  • false:表示写多个文件,当读不到任何数据时,若配置文件头,会输出空文件只包含文件头,否则只输出空文件。

说明

当写入ORC、parquet类型数据时,writeSingleObject参数不生效,即使用该参数无法在多并发场景下,写入单个ORC或parquet文件。若要写入单个文件,您可以将并发设置为1,但文件名会添加随机后缀,并且设置并发为1时,将影响同步任务的速度。

false

fileFormat

文件写出的格式,支持以下几种格式:

  • csv:仅支持严格的csv格式。如果待写数据包括列分隔符,则会根据csv的转义语法转义,转义符号为双引号(")。

  • text:使用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不进行转义。

  • parquet:若使用此文件类型,必须增加parquetschema参数定义数据类型。

    重要
  • ORC:若使用此种格式,需要转脚本模式。

text

compress

写入OSS的数据文件的压缩格式(需使用脚本模式任务配置)。

说明

csv、text文本类型不支持压缩,parquet/orc文件支持gzip、snappy等压缩。

fieldDelimiter

写入的字段分隔符。

,

encoding

写出文件的编码配置。

utf-8

parquetSchema

以Parquet文件格式写入OSS的必填项,用来描述目标文件的结构,所以此项当且仅当fileFormatparquet时生效,格式如下。

message MessageType名 {
是否必填, 数据类型, 列名;
......................;
}

配置项说明如下:

  • MessageType名:填写名称。

  • 是否必填:required表示非空,optional表示可为空。推荐全填optional。

  • 数据类型:Parquet文件支持BOOLEAN、INT32、INT64、INT96、FLOAT、DOUBLE、BINARY(如果是字符串类型,请填BINARY)和FIXED_LEN_BYTE_ARRAY等类型。

说明

每行列设置必须以分号结尾,最后一行也要写上分号。

示例如下。

message m {
optional int64 id;
optional int64 date_id;
optional binary datetimestring;
optional int32 dspId;
optional int32 advertiserId;
optional int32 status;
optional int64 bidding_req_num;
optional int64 imp;
optional int64 click_num;
}

nullFormat

文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义可以表示为null的字符串。例如,您配置nullFormat="null",如果源头数据是null,数据同步系统会视作null字段。

header

OSS写出时的表头,例如,["id", "name", "age"]

maxFileSize(高级配置,向导模式不支持)

OSS写出时单个Object文件的最大值,默认为10,000*10MB,类似于在打印log4j日志时,控制日志文件的大小。OSS分块上传时,每个分块大小为10MB(也是日志轮转文件最小粒度,即小于10MB的maxFileSize会被作为10MB),每个OSS InitiateMultipartUploadRequest支持的分块最大数量为10,000。

轮转发生时,Object名字规则是在原有Object前缀加UUID随机数的基础上,拼接_1,_2,_3等后缀。

说明

默认单位为MB。

配置示例:"maxFileSize":300, 表示设置单个文件大小为300M。

100,000

suffix(高级配置,向导模式不支持)

数据同步写出时,生成的文件名后缀。例如,配置suffix.csv,则最终写出的文件名为fileName****.csv

附录:parquet类型数据的转化策略

如果您没有配置parquetSchema,那么DataWorks侧会根据源端字段类型,按照一定的策略进行相应数据类型转换,转换策略如下。

转换后的数据类型

Parquet type

Parquet logical type

CHAR / VARCHAR / STRING

BINARY

UTF8

BOOLEAN

BOOLEAN

不涉及

BINARY / VARBINARY

BINARY

不涉及

DECIMAL

FIXED_LEN_BYTE_ARRAY

DECIMAL

TINYINT

INT32

INT_8

SMALLINT

INT32

INT_16

INT/INTEGER

INT32

不涉及

BIGINT

INT64

不涉及

FLOAT

FLOAT

不涉及

DOUBLE

DOUBLE

不涉及

DATE

INT32

DATE

TIME

INT32

TIME_MILLIS

TIMESTAMP/DATETIME

INT96

不涉及