DataWorks data integration supports high-performance data synchronization across disparate data sources using the Apache Arrow column store format. This solution uses in-memory passthrough and zero-copy technology to change data transfer from row-based to column-based, which improves synchronization throughput for large data volumes. In test scenarios, performance can increase by nearly 10 times.
Use cases
When processing massive amounts of data, row-store-based data synchronization solutions often face performance bottlenecks. These bottlenecks are caused by frequent serialization, deserialization, and type conversions. This is especially true during data warehouse migrations, such as from Hive to MaxCompute, or when building a data lakehouse architecture. Long synchronization times for terabytes of data can affect service launch and data analysis efficiency. This solution solves the problem of inefficient data forwarding between large-scale column-store data sources. It can reduce synchronization time from hours to minutes.
Scenario 1: Big data migration
Pain point: Migrating hundreds of terabytes of data from Hive to MaxCompute is time-consuming and affects service launch.
Solution: Enable Arrow synchronization for direct column-store transfer to avoid format conversion.
Result: Migration time is reduced from hours to minutes, increasing efficiency by more than 10 times.
Scenario 2: Disparate data source integration and data lakehouse architecture
Data flows freely and efficiently between the data lake (such as the Hive ecosystem) and the data warehouse (Hologres/MaxCompute). The core value is to use a single data copy for multiple purposes and enable lake-warehouse synergy.
Solution architecture
DataWorks data integration introduces the Apache Arrow columnar in-memory standard. It refactors the data transfer method between the Reader and Writer. This changes the process from row-based processing to direct column-based passthrough.
Traditional row-store synchronization architecture
In the traditional architecture, data is processed row by row. Even if the source and destination both use a column-store format, such as Parquet or ORC, the data is converted into generic row-based Record objects inside the synchronization engine. The format is designed for single-row storage. Each Record object defines several Columns. Each Column contains the Value for that column in the current row. For example, consider synchronizing column-store data from MaxCompute (ODPS) to MaxCompute (ODPS):
This process includes the following steps and creates the following performance overhead:
Reader decoding: The Reader reads column-store data from the source, such as MaxCompute. It then decodes the data and converts it row by row into the framework's internal
Recordobjects.Framework transfer: The framework transfers a large number of
Recordobjects in the memory queue.Writer encoding: The Writer retrieves
Recordobjects from the framework, encodes them into the column-store format that the destination requires, and then writes the data.
This process involves multiple data format conversions, serialization/deserialization, and frequent object creation. This leads to high CPU and memory resource consumption, frequent garbage collection (GC), and limited overall throughput.
Arrow column-store synchronization architecture
The new Arrow-based architecture implements an end-to-end columnar data stream. The data maintains its column-store format throughout the synchronization pipeline. This avoids the overhead of row-based conversion.
By introducing the ArrowTabularRecord data structure, the synchronization process is optimized as follows:
Direct read by Reader: The Reader uses the MaxCompute Tunnel Arrow API to directly read column-store data from the source. It then encapsulates the data into
ArrowTabularRecordobjects and delivers them to the framework in batches.Framework transfer: The framework directly passes the
ArrowTabularRecordobjects, which contain binary column data.Direct write by Writer: The Writer uses the Tunnel Arrow API to retrieve Arrow-formatted binary data directly from the
ArrowTabularRecordand uses zero-copy technology to transfer it to the destination, such as a Tunnel Server, without re-serialization.
This architecture provides native support for Arrow and eliminates the intermediate row-based conversion step. This achieves end-to-end "short-circuit synchronization" for column stores, which greatly improves throughput and reduces latency.
Implementation steps
You can add specific parameters to a DataWorks data integration task to enable high-performance Arrow synchronization. This feature is currently supported for full-database and single-table offline synchronization tasks where the source and destination are MaxCompute, Hologres, or Hive/OSS/Hadoop Distributed File System (HDFS) (Parquet/ORC).
Method 1: Full-database offline synchronization
Use the data integration full-database offline synchronization task feature. For example, for an offline synchronization from Hive to MaxCompute, the system automatically detects the field types of the source and destination tables and enables high-performance Arrow synchronization. No manual configuration is required. You can view this setting in in the upper-right corner of the task editor.
In Runtime Config, locate the Enable Arrow short-circuit transfer parameter. The available options are Auto (default), Force enable, and Force disable.
Method 2: Single-table offline synchronization
When configuring a single-table offline synchronization task, you can manually enable the feature in the code editor. To do this, add "useArrow": true to the parameter configuration of both the Reader and the Writer.
Prerequisite: To enable this feature, the column type definitions of the source and destination tables must be identical. This is because this mode skips the type conversion step and transfers data directly in memory.
The following is a sample configuration for synchronizing data from Hive (Reader) to MaxCompute (Writer):
{
"type": "job",
"steps": [
{
"stepType": "hive",
"parameter": {
"useArrow": true,
"datasource": "my_datasource",
"column": [
"col1",
"col2"
],
"readMode": "hdfs",
"table": "table"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"useArrow": true,
"truncate": false,
"datasource": "odps_test",
"column": [
"col1",
"col2"
],
"table": "table"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"speed": {
"concurrent": 3
}
}
}Performance validation
The following sections show the performance improvement for several data source combinations. The test conditions for performance validation are as follows:
Stress test conditions: Server specifications: 64 cores, 256 GB of memory. Network interface card: 25 Gbit/s.
Test data volume: 42,045,700 records.
Data Source | Supported Feature | Replication Performance Improvement |
MaxCompute | Directly read column-store data through the Tunnel Arrow API | Replication performance improved by 200% |
Hologres | Supports Arrow format export to avoid Java Database Connectivity (JDBC) row-based bottlenecks | Replication performance improved by 95% |
Hive, OSS, HDFS, and other distributed file systems | Directly read underlying Arrow format data from Parquet/ORC | PARQUET synchronization performance improved by 5.55 times. ORC synchronization performance improved by 9.85 times. |
Scenario 1: MaxCompute column-store short-circuit synchronization (Arrow → Arrow)
Concurrency | Traditional Row Store | Arrow Column Store | Performance Improvement |
1 | 67.8 MB/s 3740 R/s | 212.6 MB/s 11462 R/s | +206.5% |
3 | 185.6 MB/s 10226 R/s | 569.9 MB/s 30728 R/s | +200.5% |
8 | 462.1 MB/s 25467 R/s | 1321.0 MB/s 71143 R/s | +197.4% |
Scenario 2: Hologres → MaxCompute synchronization
Concurrency | Traditional Synchronization | Arrow Synchronization | Performance Improvement |
4 | 439.1 MB/s 216480 R/s | 906.1 MB/s 404270 R/s | +87% |
8 | 773.3 MB/s 381300 R/s | 1669.1 MB/s 745654 R/s | +95% |
Scenario 3: Parquet/ORC → MaxCompute synchronization
Column Store Format | Traditional Synchronization | Arrow Synchronization | Performance Improvement |
Parquet | 26.1 MB/s 35631 R/s | 1198.1 MB/s 233587 R/s | 5.55 times |
ORC | 21.4 MB/s 27661 R/s | 3256.3 MB/s 300326 R/s | 9.85 times |
Parquet and ORC are column-store formats used in distributed file systems such as HDFS and OSS.