The OSS data source provides read and write access to OSS. This topic describes how DataWorks supports OSS data synchronization.
Supported field types and limitations
Batch data read
The OSS Reader reads data from OSS and converts it into a format that Data Integration can process. As OSS is an unstructured data storage service, the reader supports the following features.
Supported | Not supported |
|
|
When you prepare data in OSS, CSV files must conform to the standard CSV format. For example, you must escape any double quotation mark (") in a column by replacing it with two double quotation marks (""); otherwise, parsing errors may occur. If a file contains multiple delimiters, we recommend using the TXT file type.
OSS is an unstructured data source that stores data as files. Before running a synchronization task, verify that the field structure is configured correctly. If the source data structure changes, you must update the task configuration to prevent data scrambling.
Batch data write
The OSS Writer converts Data Integration-compliant data and writes it to OSS as text files. As OSS is an unstructured data storage service, the writer supports the following features.
Supported | Not supported |
|
|
Category | Column type |
Integer | LONG |
String | STRING |
Floating-point | DOUBLE |
Boolean | BOOLEAN |
Date and time | DATE |
Real-time data write
Supports real-time data writes.
You can perform real-time writes from a single table to the following data lake formats: Hudi (0.12.x), Paimon, and Iceberg.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.
When you add an OSS data source across accounts, you must grant the necessary permissions. For more information, see Use a bucket policy to grant cross-account access to OSS.
If you use RAM role-based authorization to configure an OSS data source, see Configure a data source by using RAM role authorization.
When you add an OSS data source across regions, we recommend using a public endpoint. For more information, see Overview of endpoints and network connectivity.
Configure data synchronization tasks
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Single-table batch synchronization
See Configuration in the codeless UI and Configuration in script mode.
For script mode parameters and a code sample, see Appendix: Code sample and parameter descriptions.
Single-table real-time synchronization
See Configure a single-table real-time synchronization task.
Full-database synchronization
See Configure a full-database batch synchronization task and Configure a full-database real-time synchronization task.
FAQ
Is there a file limit for reading from OSS?
How do I handle dirty data when reading multi-delimiter CSV files?
Appendix: Sample script and parameters
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
General reader script example
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"oss",// The plugin name.
"parameter":{
"nullFormat":"",// The string that represents a null value.
"compress":"",// The compression type.
"datasource":"",// The data source.
"column":[// The column definitions.
{
"index":0,// The column index. Starts from 0.
"type":"string"// The data type.
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // The time format. Required when 'type' is 'date'.
"index":4,
"type":"date"
}
],
"skipHeader":"",// Whether to skip the header in CSV-like files.
"encoding":"",// The encoding.
"fieldDelimiter":",",// The field delimiter.
"fileFormat": "",// The file format.
"object":[]// Object prefixes for the files to read.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// The maximum number of allowed error records.
},
"speed":{
"throttle":true,// If true, enables throttling. If false, the 'mbps' parameter is ignored.
"concurrent":1, // The job concurrency.
"mbps":"12"// The throttling speed limit in MB/s (1 mbps = 1 MB/s).
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Read ORC or Parquet files from OSS
You can read ORC or Parquet files from OSS by reusing the HDFS reader. To do so, specify the standard OSS reader parameters along with the path (for ORC) and fileFormat (for ORC and Parquet) parameters. For parameter descriptions, see HDFS reader.
This example shows how to read data in ORC format from OSS.
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }This example shows how to read data in Parquet format from OSS.
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader script parameters
Parameter | Description | Required | Default |
datasource | Specifies the name of the data source. In script mode, this value must match the name of a configured data source. | Yes | None |
object | Specifies one or more objects to synchronize from OSS. You can specify objects by using explicit paths, wildcards, or dynamic parameters. 1. Configuration methods
Important
2. Concurrency and performance The configuration method directly determines data extraction concurrency:
| Yes | None |
parquetSchema | Use this parameter to define the schema for files in the Parquet file format. It applies only when fileFormat is set to parquet. After you specify the parquetSchema, ensure that the overall configuration is valid JSON. The format for parquetSchema is as follows:
The following is a configuration example: | No | None |
column | Specifies the list of columns to read. 'type' specifies the data type in the source. 'index' specifies the zero-based index of the column in the text file. 'value' specifies a constant value for the column, which instructs the system to generate the value instead of reading it from the source file. By default, you can read all columns as the STRING data type, as shown in the following example. You can also specify details for each column, as shown in the following example. Note For each column you specify, 'type' is required, and you must specify either 'index' or 'value'. | Yes | All columns are read as the STRING data type. |
fileFormat | The file format of the source objects in OSS. Valid values are 'csv' and 'text'. Both formats support custom delimiters. | Yes | csv |
fieldDelimiter | The field delimiter. Note A field delimiter is required. If unspecified, the parameter defaults to a comma (,), which is also the UI default. Specify an invisible character by its Unicode representation (e.g., \u001b). You can also use this format for visible characters (e.g., \u007c for the pipe symbol). | Yes | , |
lineDelimiter | The line delimiter. Note This parameter applies only when 'fileFormat' is set to 'text'. | No | None |
compress | The compression type of the source files. If this parameter is not set, files are assumed to be uncompressed. Supported types: gzip, bzip2, and zip. | No | No compression |
encoding | The character encoding of the source files. | No | utf-8 |
nullFormat | Specifies a string in the source data that represents a null value. This is necessary because text files lack a standard representation for null values. For example:
| No | None |
skipHeader | CSV-like files may have a header row that serves as a title, which needs to be skipped. By default, this row is not skipped. The skipHeader parameter is not supported in compressed file mode. | No | false |
csvReaderConfig | A map of additional settings for the CsvReader used to parse CSV files. If this parameter is omitted, the CsvReader uses its default settings. | No | None |
Writer script example
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// The plugin name.
"parameter":{
"nullFormat":"",// Defines which strings are interpreted as null values.
"dateFormat":"",// The date format.
"datasource":"",// The datasource.
"writeMode":"",// The write mode.
"writeSingleObject":"false", // If true, writes all data to a single OSS file.
"encoding":"",// The file encoding.
"fieldDelimiter":",",// The field delimiter.
"fileFormat":"",// The file format.
"object":""// The object prefix.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The error limit.
},
"speed":{
"throttle":true,// Enables throttling. If false, the 'mbps' parameter is ignored.
"concurrent":1, // The job concurrency.
"mbps":"12"// The throttling limit in MB/s (1 mbps = 1 MB/s).
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Sample script: Write ORC or Parquet files to OSS
You can write ORC or Parquet files to OSS using the HDFS Writer. In addition to the existing OSS Writer parameters, this method adds extended configuration parameters, such as path and fileFormat. For descriptions of these parameters, see HDFS Writer.
The following examples show how to write ORC or Parquet files to OSS:
The following code is for demonstration purposes only. You must modify the parameters to match your specific column names and column types. Do not copy the code for use in a production environment.
Write ORC files to OSS
You can currently write ORC files only in the code editor. You must switch to the code editor to configure the task. Set the fileFormat parameter to
orc, specify the file path, and define each column using the format{"name":"your column name","type": "your column type"}.The following ORC column types are supported for offline writes:
Type
Status
TINYINT
Supported
SMALLINT
Supported
INT
Supported
BIGINT
Supported
FLOAT
Supported
DOUBLE
Supported
TIMESTAMP
Supported
DATE
Supported
VARCHAR
Supported
STRING
Supported
CHAR
Supported
BOOLEAN
Supported
DECIMAL
Supported
BINARY
Supported
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }Write Parquet files to OSS
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer script parameters
Parameter | Description | Required | Default |
datasource | The name of the data source. In the code editor, this value must match the name of the configured data source. | Yes | None |
object | The name of the output object in OSS. OSS uses object names to simulate a directory structure. The object name has the following conventions:
If you do not want a random UUID suffix, configure | Yes | None |
ossBlockSize | The size of an OSS part. The default value is 16 MB. When the file output format is parquet or ORC, you can configure this parameter at the same level as the object parameter. Because an OSS multipart upload is limited to 10,000 parts, the default part size restricts the maximum file size to 160 GB. To upload larger files, you must increase the part size. | No | 16 |
writeMode | Specifies how OSS Writer handles existing data before writing:
| Yes | None |
writeSingleObject | Determines whether to write all data into a single object.
Note
| No | false |
fileFormat | The file format of the output objects. The following formats are supported:
| No | text |
compress | Specifies the compression format for the data files written to OSS. This parameter must be configured in the code editor. Important This parameter applies only to the Parquet and ORC file formats, for which only SNAPPY compression is supported. The csv and text formats do not support compression. | No | None |
fieldDelimiter | The field delimiter for the output data. | No | , |
encoding | Specifies the character encoding for the output files. | No | utf-8 |
parquetSchema | Required when fileFormat is set to parquet. This parameter defines the schema of the output Parquet file. Use the following format: The configuration items are described as follows:
Note Each column definition must end with a semicolon, including the last one. Example: | No | None |
nullFormat | In text files, you cannot use a standard string to define a null value. The data synchronization system provides the nullFormat parameter to specify a string that represents a null value. For example, if you configure | No | None |
header | Specifies the header to write to the output files. The value must be a JSON array of strings, such as | No | None |
maxFileSize (Advanced configuration, not supported in the codeless UI) | Controls the maximum size of a single output object before the file is rotated, similar to log file rotation. For multipart uploads, the part size is 10 MB, which also serves as the minimum rotation granularity. A When a file is rotated, the new object name is created by appending a sequence number (such as _1, _2, or _3) to the base object name, which already includes a prefix and a random UUID. Note
| No | 100,000 |
suffix (Advanced configuration, not supported in the codeless UI) | Specifies a suffix to append to the output object names. For example, if you set suffix to .csv, the final object names are similar to object-prefix_random-uuid.csv. | No | None |
Appendix: Parquet data type conversion
If you do not configure parquetSchema, DataWorks automatically converts source data types as shown below.
Source type | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | N/A |
BINARY / VARBINARY | BINARY | N/A |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | N/A |
BIGINT | INT64 | N/A |
FLOAT | FLOAT | N/A |
DOUBLE | DOUBLE | N/A |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | N/A |