本文为您介绍OSS Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。

背景信息

OSS Writer插件为您提供向OSS写入类CSV格式的一个或多个表文件的功能,写入的文件个数和您的任务并发及同步的文件数有关。
说明 开始配置OSS Writer插件前,请首先配置好数据源,详情请参见配置OSS数据源

写入OSS中的是一张逻辑意义上的二维表,例如CSV格式的文本信息。如果您想对OSS产品有更深入的了解,请参见OSS产品概述

OSS Java SDK的详细介绍,请参见阿里云OSS Java SDK

OSS Writer实现了从数据同步协议转为OSS中的文本文件功能,OSS本身是无结构化数据存储,目前OSS Writer支持的功能如下:
  • 支持且仅支持写入文本类型(不支持BLOB,如视频和图片)的文件,并要求文本文件中的Schema为一张二维表。
  • 支持类CSV格式文件,自定义分隔符。
  • 支持多线程写入,每个线程写入不同的子文件。
  • 文件支持滚动,当文件大于某个size值时,支持文件切换。
OSS Writer暂时不能实现以下功能:
  • 单个文件不能支持并发写入。
  • OSS本身不提供数据类型,OSS Writer均以STRING类型写入OSS对象。

参数说明

参数描述是否必选默认值
datasource数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
objectOSS Writer写入的文件名,OSS使用文件名模拟目录的实现。OSS对于Object的名称有以下限制:
  • 使用"object": "datax",写入的Object以datax开头,后缀添加随机字符串。
  • 使用"object": "cdo/datax",写入的Object以/cdo/datax开头,后缀随机添加字符串,OSS模拟目录的分隔符为(/)。

如果您不需要后缀随机UUID,建议您配置"writeSingleObject" : "true",详情请参见writeSingleObject说明。

writeModeOSS Writer写入前,数据的处理:
  • truncate:写入前清理Object名称前缀匹配的所有Object。例如"object":"abc",将清理所有abc开头的Object。
  • append:写入前不进行任何处理,数据集成OSS Writer直接使用Object名称写入,并使用随机UUID的后缀名来保证文件名不冲突。例如您指定的Object名为数据集成,实际写入为DI_****_****_****
  • nonConflict:如果指定路径出现前缀匹配的Object,直接报错。例如"object":"abc",如果存在abc123的Object,将直接报错。
writeSingleObjectOSS写数据时,是否写单个文件:
  • true:表示写单个文件,当读不到任何数据时, 不会产生空文件。
  • false:表示写多个文件,当读不到任何数据时,若配置文件头,会输出空文件只包含文件头,否则只输出空文件。
说明 当写入ORC、parquet类型数据时,writeSingleObject参数不生效,即使用该参数无法在多并发场景下,写入单个ORC或parquet文件。若要写入单个文件,您可以将并发设置为1,但文件名会添加随机后缀,并且设置并发为1时,将影响同步任务的速度。
false
fileFormat文件写出的格式,支持以下几种格式:
  • csv:仅支持严格的csv格式。如果待写数据包括列分隔符,则会根据csv的转义语法转义,转义符号为双引号(")。
  • text:使用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不进行转义。
  • parquet:若使用此文件类型,必须增加parquetschema参数定义数据类型。
    重要
  • ORC:若使用此种格式,需要转脚本模式。
text
compress写入OSS的数据文件的压缩格式(需使用脚本模式任务配置)。
说明 csv、text文本类型不支持压缩,parquet/orc文件支持gzip、snappy等压缩。
fieldDelimiter写入的字段分隔符。 ,
encoding写出文件的编码配置。utf-8
nullFormat文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义可以表示为null的字符串。例如,您配置nullFormat="null",如果源头数据是null,数据同步系统会视作null字段。
headerOSS写出时的表头,例如,["id", "name", "age"]
maxFileSize(高级配置,向导模式不支持)OSS写出时单个Object文件的最大值,默认为10,000*10MB,类似于在打印log4j日志时,控制日志文件的大小。OSS分块上传时,每个分块大小为10MB(也是日志轮转文件最小粒度,即小于10MB的maxFileSize会被作为10MB),每个OSS InitiateMultipartUploadRequest支持的分块最大数量为10,000。

轮转发生时,Object名字规则是在原有Object前缀加UUID随机数的基础上,拼接_1,_2,_3等后缀。

说明 默认单位为MB。

配置示例:"maxFileSize":300, 表示设置单个文件大小为300M。

100,000
suffix(高级配置,向导模式不支持)数据同步写出时,生成的文件名后缀。例如,配置suffix.csv,则最终写出的文件名为fileName****.csv

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向数据源
    参数描述
    数据源即上述参数说明中的datasource,通常填写您配置的数据源名称。
    文件名(含路径)即上述参数说明中的Object,填写OSS文件夹的路径,其中不要填写bucket的名称。
    文本类型包括csvtextparquet
    重要
    列分隔符即上述参数说明中的fieldDelimiter,默认值为(,)。
    编码格式即上述参数说明中的encoding,默认值为utf-8
    null值即上述参数说明中的nullFormat,将要表示为空的字段填入文本框,如果源端存在则将对应的部分转换为空。
    时间格式日期类型的数据序列化到Object时的格式,例如"dateFormat": "yyyy-MM-dd"
    前缀冲突有同样的文件时,可以选择替换、保留或报错。
  2. 字段映射。左侧的源头表字段和右侧的目标表字段为一一对应的关系。字段映射
    参数描述
    同名映射单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射单击取消映射,可以取消建立的映射关系。
  3. 通道控制。通道配置
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

脚本配置示例如下所示,使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务
{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"oss",//插件名。
            "parameter":{
                "nullFormat":"",//数据同步系统提供nullFormat,定义哪些字符串可以表示为null。
                "dateFormat":"",//日期格式。
                "datasource":"",//数据源。
                "writeMode":"",//写入模式。
                "writeSingleObject":"false", //表示是否将同步数据写入单个oss文件。
                "encoding":"",//编码格式。
                "fieldDelimiter":","//字段分隔符。
                "fileFormat":"",//文本类型。
                "object":""//Object前缀。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

附录1:ORC或Parquet文件写入OSS脚本配置demo

目前通过复用HDFS Writer的方式完成OSS写ORC或Parquet格式的文件。在OSS Writer已有参数的基础上,增加了PathFileFormat等扩展配置参数,参数含义请参见HDFS Writer

ORC或Parquet文件写入OSS的示例如下:
重要 以下仅为示例,请根据您自己具体的列名称和类型修改对应的参数,请勿直接复制使用。

以ORC文件格式写入OSS

写ORC文件,当前仅支持脚本模式,您需要转脚本模式配置,其中fileFormat需要配置为orcpath需要配置为写入文件的路径,column配置格式为 {"name":"your column name","type": "your column type"}

当前支持写入的ORC类型如下:
字段类型离线写OSS(ORC格式)
TINYINT支持
SMALLINT支持
INT支持
BIGINT支持
FLOAT支持
DOUBLE支持
TIMESTAMP支持
DATE支持
VARCHAR支持
STRING支持
CHAR支持
BOOLEAN支持
DECIMAL支持
BINARY支持
{
      "stepType": "oss",
      "parameter": {
        "datasource": "",
        "fileFormat": "orc",
        "path": "/tests/case61",
        "fileName": "orc",
        "writeMode": "append",
        "column": [
          {
            "name": "col1",
            "type": "BIGINT"
          },
          {
            "name": "col2",
            "type": "DOUBLE"
          },
          {
            "name": "col3",
            "type": "STRING"
          }
        ],
        "writeMode": "append",
        "fieldDelimiter": "\t",
        "compress": "NONE",
        "encoding": "UTF-8"
      }
    }

以Parquet文件格式写入OSS

{
      "stepType": "oss",
      "parameter": {
        "datasource": "",
        "fileFormat": "parquet",
        "path": "/tests/case61",
        "fileName": "test",
        "writeMode": "append",
        "fieldDelimiter": "\t",
        "compress": "SNAPPY",
        "encoding": "UTF-8",
        "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n  repeated group list {\n    required binary element (UTF8);\n  }\n}\nrequired group params_struct {\n  required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n  repeated group list {\n    required group element {\n required int64 id;\n required binary name (UTF8);\n}\n  }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n  required int64 id;\n required binary name (UTF8);\n  }\n}\n}\nrequired group params_struct_complex {\n  required int64 id;\n required group detail {\n  required int64 id;\n required binary name (UTF8);\n  }\n  }\n}",
        "dataxParquetMode": "fields"
      }
    }

附录2:parquet类型数据的转化策略

如果您没有配置parquetSchema,那么DataWorks侧会根据源端字段类型,按照一定的策略进行相应数据类型转换,转换策略如下。
转换后的数据类型Parquet typeParquet logical type
CHAR / VARCHAR / STRINGBINARYUTF8
BOOLEANBOOLEAN不涉及
BINARY / VARBINARYBINARY不涉及
DECIMALFIXED_LEN_BYTE_ARRAYDECIMAL
TINYINTINT32INT_8
SMALLINTINT32INT_16
INT/INTEGERINT32不涉及
BIGINTINT64不涉及
FLOATFLOAT不涉及
DOUBLEDOUBLE不涉及
DATEINT32DATE
TIMEINT32TIME_MILLIS
TIMESTAMP/DATETIMEINT96不涉及

常见问题

写入OSS出现随机字符串如何去除?