OSS

更新时间:
复制 MD 格式

The OSS data source provides read and write access to OSS. This topic describes how DataWorks supports OSS data synchronization.

Supported field types and limitations

Batch data read

The OSS Reader reads data from OSS and converts it into a format that Data Integration can process. As OSS is an unstructured data storage service, the reader supports the following features.

Supported

Not supported

  • Supports TXT files, which must have a two-dimensional table schema.

  • Supports CSV-like files with a custom delimiter.

    Note

    Text formats (TXT and CSV) support gzip, bzip2, and zip compression.

    You cannot package multiple files into a single compressed archive.

  • Supports files in ORC and Parquet formats.

  • Supports reading various data types (represented as STRING), column pruning, and column constants.

  • Supports recursive reads and object name filtering.

  • Supports concurrent reads from multiple objects.

  • The reader cannot read a single object (file) concurrently using multiple threads.

  • A single compressed object does not support concurrent multi-threaded reads.

Important
  • When you prepare data in OSS, CSV files must conform to the standard CSV format. For example, you must escape any double quotation mark (") in a column by replacing it with two double quotation marks (""); otherwise, parsing errors may occur. If a file contains multiple delimiters, we recommend using the TXT file type.

  • OSS is an unstructured data source that stores data as files. Before running a synchronization task, verify that the field structure is configured correctly. If the source data structure changes, you must update the task configuration to prevent data scrambling.

Batch data write

The OSS Writer converts Data Integration-compliant data and writes it to OSS as text files. As OSS is an unstructured data storage service, the writer supports the following features.

Supported

Not supported

  • Supports writing data to text files that have a two-dimensional table schema. Writing files that contain binary large objects (BLOBs), such as videos and images, is not supported.

  • Supports CSV-like files with a custom delimiter.

  • Supports files in ORC and Parquet formats.

    Note

    In the code editor, SNAPPY compression is supported.

  • Supports multi-threaded writes, where each thread writes to a separate part-file.

  • Supports file rotation, where a new file is created for subsequent writes after the current file exceeds a specified size.

  • Concurrent writes to a single file are not supported.

  • Because OSS does not provide native data types, the OSS Writer writes all data as the STRING type to OSS objects.

  • You cannot write data to an OSS bucket whose storage class is Cold Archive.

  • A single object (file) cannot exceed 100 GB.

Category

Column type

Integer

LONG

String

STRING

Floating-point

DOUBLE

Boolean

BOOLEAN

Date and time

DATE

Real-time data write

  • Supports real-time data writes.

  • You can perform real-time writes from a single table to the following data lake formats: Hudi (0.12.x), Paimon, and Iceberg.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.

Note

Configure data synchronization tasks

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Single-table batch synchronization

Single-table real-time synchronization

See Configure a single-table real-time synchronization task.

Full-database synchronization

See Configure a full-database batch synchronization task and Configure a full-database real-time synchronization task.

FAQ

Is there a file limit for reading from OSS?

How do I handle dirty data when reading multi-delimiter CSV files?

Appendix: Sample script and parameters

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

General reader script example

{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
            "stepType":"oss",// The plugin name.
            "parameter":{
                "nullFormat":"",// The string that represents a null value.
                "compress":"",// The compression type.
                "datasource":"",// The data source.
                "column":[// The column definitions.
                    {
                        "index":0,// The column index. Starts from 0.
                        "type":"string"// The data type.
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", // The time format. Required when 'type' is 'date'.
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"",// Whether to skip the header in CSV-like files.
                "encoding":"",// The encoding.
                "fieldDelimiter":",",// The field delimiter.
                "fileFormat": "",// The file format.
                "object":[]// Object prefixes for the files to read.
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""// The maximum number of allowed error records.
        },
        "speed":{
            "throttle":true,// If true, enables throttling. If false, the 'mbps' parameter is ignored.
            "concurrent":1, // The job concurrency.
            "mbps":"12"// The throttling speed limit in MB/s (1 mbps = 1 MB/s).
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Read ORC or Parquet files from OSS

You can read ORC or Parquet files from OSS by reusing the HDFS reader. To do so, specify the standard OSS reader parameters along with the path (for ORC) and fileFormat (for ORC and Parquet) parameters. For parameter descriptions, see HDFS reader.

  • This example shows how to read data in ORC format from OSS.

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
    "column": [
    {
    "index": 0,
    "type": "long"
    },
    {
    "index": "1",
    "type": "string"
    },
    {
    "index": "2",
    "type": "string"
    }
    ]
    }
    }
  • This example shows how to read data in Parquet format from OSS.

    {
      "type":"job",
        "version":"2.0",
        "steps":[
        {
          "stepType":"oss",
          "parameter":{
            "nullFormat":"",
            "compress":"",
            "fileFormat":"parquet",
            "path":"/*",
            "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }",
            "column":[
              {
                "index":"0",
                "type":"string"
              },
              {
                "index":"1",
                "type":"long"
              },
              {
                "index":"2",
                "type":"string"
              },
              {
                "index":"3",
                "type":"string"
              },
              {
                "index":"4",
                "type":"string"
              },
              {
                "index":"5",
                "type":"string"
              },
              {
                "index":"6",
                "type":"string"
              },
              {
                "index":"7",
                "type":"string"
              },
              {
                "index":"8",
                "type":"string"
              },
              {
                "index":"9",
                "type":"string"
              },
              {
                "index":"10",
                "type":"double"
              },
              {
                "index":"11",
                "type":"string"
              },
              {
                "index":"12",
                "type":"string"
              }
            ],
            "skipHeader":"false",
            "encoding":"UTF-8",
            "fieldDelimiter":",",
            "fieldDelimiterOrigin":",",
            "datasource":"wpw_demotest_oss",
            "envType":0,
            "object":[
              "wpw_demo/userdata1.parquet"
            ]
          },
          "name":"Reader",
          "category":"reader"
        },
        {
          "stepType":"odps",
          "parameter":{
            "partition":"dt=${bizdate}",
            "truncate":true,
            "datasource":"0_odps_wpw_demotest",
            "envType":0,
            "column":[
              "id"
            ],
            "emptyAsNull":false,
            "table":"wpw_0827"
          },
          "name":"Writer",
          "category":"writer"
        }
      ],
        "setting":{
        "errorLimit":{
          "record":""
        },
        "locale":"zh_CN",
          "speed":{
          "throttle":false,
            "concurrent":2
        }
      },
      "order":{
        "hops":[
          {
            "from":"Reader",
            "to":"Writer"
          }
        ]
      }
    }

Reader script parameters

Parameter

Description

Required

Default

datasource

Specifies the name of the data source. In script mode, this value must match the name of a configured data source.

Yes

None

object

Specifies one or more objects to synchronize from OSS. You can specify objects by using explicit paths, wildcards, or dynamic parameters.

1. Configuration methods

  • Explicit path

    • Basic rule: Paths must start from the root directory of the bucket and must not include the bucket name.

    • Specify a single file: Enter the full path of the file. For example: my_folder/my_file.txt.

    • Specify multiple objects: Separate the paths of multiple files or folders with a comma (,). For example: folder_a/file1.txt, folder_a/file2.txt.

  • Wildcard path

    • You can use wildcards to match multiple files with a specific pattern.

    • *: Matches zero or more characters.

    • ?: Matches exactly one character.

    • Examples:

      • abc*[0-9].txt matches files such as abc0.txt, abc10.txt, and abc_test_9.txt.

      • abc?.txt matches files such as abc1.txt and abcX.txt.

  • Dynamic parameter path

    • Embed scheduling parameters in the path to automate data synchronization. When the job runs, the system replaces these parameters with their actual values.

    • For example, if you configure the path as raw_data/${bizdate}/abc.txt, the job can dynamically synchronize the folder for the corresponding business date each day.

    • For more information about how to use scheduling parameters, see Scheduling Parameter Sources and Their Expressions.

Important
  • Use wildcards with caution. Using wildcards, especially *, triggers a recursive scan of the OSS path. For a large number of files, this can consume significant memory and time and may cause an out of memory error. Avoid using broad wildcards in a production environment. If you encounter this issue, split the files into different directories before synchronizing.

  • The data synchronization system treats all objects synchronized in a single job as a single data table. You must ensure that all objects conform to the same schema.

2. Concurrency and performance

The configuration method directly determines data extraction concurrency:

  • Single-threaded mode: When you specify a single, uncompressed file with an explicit path, the job extracts data in single-threaded mode.

  • Multi-threaded mode: When you specify multiple explicit files or match multiple files with a wildcard, the job uses concurrent reads to significantly improve extraction throughput. You can configure the concurrency in the Channel settings.

Yes

None

parquetSchema

Use this parameter to define the schema for files in the Parquet file format. It applies only when fileFormat is set to parquet. After you specify the parquetSchema, ensure that the overall configuration is valid JSON.

message MessageTypeName {
required/optional, data_type, column_name;
......................;
}

The format for parquetSchema is as follows:

  • MessageTypeName: The name of the message type.

  • Required/Optional: 'required' means the field cannot be null, while 'optional' means it can be null. We recommend setting all fields to 'optional'.

  • Data type: Parquet files support the following data types: BOOLEAN, Int32, Int64, Int96, FLOAT, DOUBLE, BINARY (use BINARY for string types), and fixed_len_byte_array.

  • Each column definition must end with a semicolon, including the last one.

The following is a configuration example:

"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"

No

None

column

Specifies the list of columns to read. 'type' specifies the data type in the source. 'index' specifies the zero-based index of the column in the text file. 'value' specifies a constant value for the column, which instructs the system to generate the value instead of reading it from the source file.

By default, you can read all columns as the STRING data type, as shown in the following example.

"column": ["*"]

You can also specify details for each column, as shown in the following example.

"column":
    {
       "type": "long",
       "index": 0    // Reads an integer from the first column of the OSS text file.
    },
    {
       "type": "string",
       "value": "alibaba"  // Generates a constant string 'alibaba' for the current column.
    }
Note

For each column you specify, 'type' is required, and you must specify either 'index' or 'value'.

Yes

All columns are read as the STRING data type.

fileFormat

The file format of the source objects in OSS. Valid values are 'csv' and 'text'. Both formats support custom delimiters.

Yes

csv

fieldDelimiter

The field delimiter.

Note

A field delimiter is required. If unspecified, the parameter defaults to a comma (,), which is also the UI default.

Specify an invisible character by its Unicode representation (e.g., \u001b). You can also use this format for visible characters (e.g., \u007c for the pipe symbol).

Yes

,

lineDelimiter

The line delimiter.

Note

This parameter applies only when 'fileFormat' is set to 'text'.

No

None

compress

The compression type of the source files. If this parameter is not set, files are assumed to be uncompressed. Supported types: gzip, bzip2, and zip.

No

No compression

encoding

The character encoding of the source files.

No

utf-8

nullFormat

Specifies a string in the source data that represents a null value. This is necessary because text files lack a standard representation for null values. For example:

  • If you set nullFormat:"null", the literal string "null" in the source data is treated as a null value.

  • If you set nullFormat:"\u0001", the string "\u0001" (an invisible character) in the source data is treated as a null value.

  • If you omit the "nullFormat" parameter, the system writes the source data to the destination as-is, without any value conversion.

No

None

skipHeader

CSV-like files may have a header row that serves as a title, which needs to be skipped. By default, this row is not skipped. The skipHeader parameter is not supported in compressed file mode.

No

false

csvReaderConfig

A map of additional settings for the CsvReader used to parse CSV files. If this parameter is omitted, the CsvReader uses its default settings.

No

None

Writer script example

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"oss",// The plugin name.
            "parameter":{
                "nullFormat":"",// Defines which strings are interpreted as null values.
                "dateFormat":"",// The date format.
                "datasource":"",// The datasource.
                "writeMode":"",// The write mode.
                "writeSingleObject":"false", // If true, writes all data to a single OSS file.
                "encoding":"",// The file encoding.
                "fieldDelimiter":",",// The field delimiter.
                "fileFormat":"",// The file format.
                "object":""// The object prefix.
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The error limit.
        },
        "speed":{
            "throttle":true,// Enables throttling. If false, the 'mbps' parameter is ignored.
            "concurrent":1, // The job concurrency.
            "mbps":"12"// The throttling limit in MB/s (1 mbps = 1 MB/s).
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Sample script: Write ORC or Parquet files to OSS

You can write ORC or Parquet files to OSS using the HDFS Writer. In addition to the existing OSS Writer parameters, this method adds extended configuration parameters, such as path and fileFormat. For descriptions of these parameters, see HDFS Writer.

The following examples show how to write ORC or Parquet files to OSS:

Important

The following code is for demonstration purposes only. You must modify the parameters to match your specific column names and column types. Do not copy the code for use in a production environment.

  • Write ORC files to OSS

    You can currently write ORC files only in the code editor. You must switch to the code editor to configure the task. Set the fileFormat parameter to orc, specify the file path, and define each column using the format {"name":"your column name","type": "your column type"}.

    The following ORC column types are supported for offline writes:

    Type

    Status

    TINYINT

    Supported

    SMALLINT

    Supported

    INT

    Supported

    BIGINT

    Supported

    FLOAT

    Supported

    DOUBLE

    Supported

    TIMESTAMP

    Supported

    DATE

    Supported

    VARCHAR

    Supported

    STRING

    Supported

    CHAR

    Supported

    BOOLEAN

    Supported

    DECIMAL

    Supported

    BINARY

    Supported

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61",
    "fileName": "orc",
    "writeMode": "append",
    "column": [
    {
    "name": "col1",
    "type": "BIGINT"
    },
    {
    "name": "col2",
    "type": "DOUBLE"
    },
    {
    "name": "col3",
    "type": "STRING"
    }
    ],
    "fieldDelimiter": "\t",
    "compress": "NONE",
    "encoding": "UTF-8"
    }
    }
  • Write Parquet files to OSS

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "parquet",
    "path": "/tests/case61",
    "fileName": "test",
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "SNAPPY",
    "encoding": "UTF-8",
    "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}",
    "dataxParquetMode": "fields"
    }
    }

Writer script parameters

Parameter

Description

Required

Default

datasource

The name of the data source. In the code editor, this value must match the name of the configured data source.

Yes

None

object

The name of the output object in OSS. OSS uses object names to simulate a directory structure. The object name has the following conventions:

  • If you set "object": "datax", the written object name starts with datax, and a random string is appended as a suffix.

  • If you set "object": "cdo/datax", the written object name starts with /cdo/datax, and a random string is appended as a suffix. OSS uses the forward slash (/) as a delimiter to simulate directories.

If you do not want a random UUID suffix, configure "writeSingleObject": "true". For more information, see the description of the writeSingleObject parameter.

Yes

None

ossBlockSize

The size of an OSS part. The default value is 16 MB. When the file output format is parquet or ORC, you can configure this parameter at the same level as the object parameter.

Because an OSS multipart upload is limited to 10,000 parts, the default part size restricts the maximum file size to 160 GB. To upload larger files, you must increase the part size.

No

16

writeMode

Specifies how OSS Writer handles existing data before writing:

  • truncate: Deletes all objects that match the prefix specified in the object parameter before writing new data. For example, if you set "object":"abc", all objects with the abc prefix are deleted.

  • append: Does not perform any cleanup. OSS Writer writes new data, using the specified object name as a prefix and appending a random UUID to ensure a unique name. For example, if you specify the object name as DataIntegration, the actual object name is similar to DataIntegration_****_****_****.

  • nonConflict: If an object whose name matches the specified object prefix already exists, the synchronization job fails. For example, if you set "object":"abc" and an object named abc123 exists, an error is reported.

Yes

None

writeSingleObject

Determines whether to write all data into a single object.

  • true: Writes all data to a single object. If no data is read from the source, an empty file is not created.

  • false: Writes data to multiple objects. If no data is read from the source and a header is configured, an empty file that contains only the header is created. Otherwise, a zero-byte empty file is created.

Note
  • This parameter does not apply to the ORC and Parquet formats. You cannot use it to write data to a single file for these formats when concurrency is greater than 1. To write to a single file, you must set concurrency to 1. However, this still appends a random suffix to the object name and can reduce the performance of the synchronization job.

  • In some cases, such as with a Hologres source, data is read by shard. This can produce multiple output files even when concurrency is set to 1.

No

false

fileFormat

The file format of the output objects. The following formats are supported:

  • csv: Writes data in strict CSV format. If data includes the field delimiter, it is escaped by being enclosed in double quotes (") according to standard CSV escaping rules.

  • text: Writes data as plain text, using the specified field delimiter. Unlike csv, this format does not escape delimiters that appear within the data.

  • parquet: If you use this file format, you must specify the parquetSchema parameter to define the data schema.

    Important
  • ORC: To use this format, you must switch to the code editor.

No

text

compress

Specifies the compression format for the data files written to OSS. This parameter must be configured in the code editor.

Important

This parameter applies only to the Parquet and ORC file formats, for which only SNAPPY compression is supported. The csv and text formats do not support compression.

No

None

fieldDelimiter

The field delimiter for the output data.

No

,

encoding

Specifies the character encoding for the output files.

No

utf-8

parquetSchema

Required when fileFormat is set to parquet. This parameter defines the schema of the output Parquet file. Use the following format:

message MessageTypeName {
modifier, data_type, column_name;
......................;
}

The configuration items are described as follows:

  • MessageTypeName: The name of the message type.

  • modifier: required indicates that the field cannot contain a null value. optional indicates that the field can contain a null value. We recommend setting all fields to optional.

  • data_type: Supported Parquet data types include BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY (for string types), and FIXED_LEN_BYTE_ARRAY.

Note

Each column definition must end with a semicolon, including the last one.

Example:

message m {
optional int64 id;
optional int64 date_id;
optional binary datetimestring;
optional int32 dspId;
optional int32 advertiserId;
optional int32 status;
optional int64 bidding_req_num;
optional int64 imp;
optional int64 click_num;
}

No

None

nullFormat

In text files, you cannot use a standard string to define a null value. The data synchronization system provides the nullFormat parameter to specify a string that represents a null value. For example, if you configure nullFormat="null" and the source data is null, the data synchronization system treats it as a null field.

No

None

header

Specifies the header to write to the output files. The value must be a JSON array of strings, such as ["id", "name", "age"].

No

None

maxFileSize (Advanced configuration, not supported in the codeless UI)

Controls the maximum size of a single output object before the file is rotated, similar to log file rotation. For multipart uploads, the part size is 10 MB, which also serves as the minimum rotation granularity. A maxFileSize value smaller than 10 MB is treated as 10 MB. An OSS InitiateMultipartUploadRequest supports a maximum of 10,000 parts.

When a file is rotated, the new object name is created by appending a sequence number (such as _1, _2, or _3) to the base object name, which already includes a prefix and a random UUID.

Note
  • The unit is megabytes (MB).

  • Example: "maxFileSize": 300 sets the maximum file size to 300 MB.

  • The maxFileSize parameter applies only to the csv and text formats. Because the size is calculated based on in-memory data, the actual size of the object in OSS may be inexact.

No

100,000

suffix (Advanced configuration, not supported in the codeless UI)

Specifies a suffix to append to the output object names. For example, if you set suffix to .csv, the final object names are similar to object-prefix_random-uuid.csv.

No

None

Appendix: Parquet data type conversion

If you do not configure parquetSchema, DataWorks automatically converts source data types as shown below.

Source type

Parquet type

Parquet logical type

CHAR / VARCHAR / STRING

BINARY

UTF8

BOOLEAN

BOOLEAN

N/A

BINARY / VARBINARY

BINARY

N/A

DECIMAL

FIXED_LEN_BYTE_ARRAY

DECIMAL

TINYINT

INT32

INT_8

SMALLINT

INT32

INT_16

INT/INTEGER

INT32

N/A

BIGINT

INT64

N/A

FLOAT

FLOAT

N/A

DOUBLE

DOUBLE

N/A

DATE

INT32

DATE

TIME

INT32

TIME_MILLIS

TIMESTAMP/DATETIME

INT96

N/A