High-performance synchronization with the Arrow column store format

更新时间:
复制 MD 格式

DataWorks data integration supports high-performance data synchronization across disparate data sources using the Apache Arrow column store format. This solution uses in-memory passthrough and zero-copy technology to change data transfer from row-based to column-based, which improves synchronization throughput for large data volumes. In test scenarios, performance can increase by nearly 10 times.

Use cases

When processing massive amounts of data, row-store-based data synchronization solutions often face performance bottlenecks. These bottlenecks are caused by frequent serialization, deserialization, and type conversions. This is especially true during data warehouse migrations, such as from Hive to MaxCompute, or when building a data lakehouse architecture. Long synchronization times for terabytes of data can affect service launch and data analysis efficiency. This solution solves the problem of inefficient data forwarding between large-scale column-store data sources. It can reduce synchronization time from hours to minutes.

Scenario 1: Big data migration

Pain point: Migrating hundreds of terabytes of data from Hive to MaxCompute is time-consuming and affects service launch.
Solution: Enable Arrow synchronization for direct column-store transfer to avoid format conversion.
Result: Migration time is reduced from hours to minutes, increasing efficiency by more than 10 times.

Scenario 2: Disparate data source integration and data lakehouse architecture

Data flows freely and efficiently between the data lake (such as the Hive ecosystem) and the data warehouse (Hologres/MaxCompute). The core value is to use a single data copy for multiple purposes and enable lake-warehouse synergy.

Solution architecture

DataWorks data integration introduces the Apache Arrow columnar in-memory standard. It refactors the data transfer method between the Reader and Writer. This changes the process from row-based processing to direct column-based passthrough.

Traditional row-store synchronization architecture

In the traditional architecture, data is processed row by row. Even if the source and destination both use a column-store format, such as Parquet or ORC, the data is converted into generic row-based Record objects inside the synchronization engine. The format is designed for single-row storage. Each Record object defines several Columns. Each Column contains the Value for that column in the current row. For example, consider synchronizing column-store data from MaxCompute (ODPS) to MaxCompute (ODPS):

image

This process includes the following steps and creates the following performance overhead:

  1. Reader decoding: The Reader reads column-store data from the source, such as MaxCompute. It then decodes the data and converts it row by row into the framework's internal Record objects.

  2. Framework transfer: The framework transfers a large number of Record objects in the memory queue.

  3. Writer encoding: The Writer retrieves Record objects from the framework, encodes them into the column-store format that the destination requires, and then writes the data.

This process involves multiple data format conversions, serialization/deserialization, and frequent object creation. This leads to high CPU and memory resource consumption, frequent garbage collection (GC), and limited overall throughput.

Arrow column-store synchronization architecture

The new Arrow-based architecture implements an end-to-end columnar data stream. The data maintains its column-store format throughout the synchronization pipeline. This avoids the overhead of row-based conversion.

image

By introducing the ArrowTabularRecord data structure, the synchronization process is optimized as follows:

  1. Direct read by Reader: The Reader uses the MaxCompute Tunnel Arrow API to directly read column-store data from the source. It then encapsulates the data into ArrowTabularRecord objects and delivers them to the framework in batches.

  2. Framework transfer: The framework directly passes the ArrowTabularRecord objects, which contain binary column data.

  3. Direct write by Writer: The Writer uses the Tunnel Arrow API to retrieve Arrow-formatted binary data directly from the ArrowTabularRecord and uses zero-copy technology to transfer it to the destination, such as a Tunnel Server, without re-serialization.

This architecture provides native support for Arrow and eliminates the intermediate row-based conversion step. This achieves end-to-end "short-circuit synchronization" for column stores, which greatly improves throughput and reduces latency.

image

Implementation steps

You can add specific parameters to a DataWorks data integration task to enable high-performance Arrow synchronization. This feature is currently supported for full-database and single-table offline synchronization tasks where the source and destination are MaxCompute, Hologres, or Hive/OSS/Hadoop Distributed File System (HDFS) (Parquet/ORC).

Method 1: Full-database offline synchronization

Use the data integration full-database offline synchronization task feature. For example, for an offline synchronization from Hive to MaxCompute, the system automatically detects the field types of the source and destination tables and enables high-performance Arrow synchronization. No manual configuration is required. You can view this setting in Advanced Configuration > Advanced Parameters > Runtime Config in the upper-right corner of the task editor.

In Runtime Config, locate the Enable Arrow short-circuit transfer parameter. The available options are Auto (default), Force enable, and Force disable.

Method 2: Single-table offline synchronization

When configuring a single-table offline synchronization task, you can manually enable the feature in the code editor. To do this, add "useArrow": true to the parameter configuration of both the Reader and the Writer.

Prerequisite: To enable this feature, the column type definitions of the source and destination tables must be identical. This is because this mode skips the type conversion step and transfers data directly in memory.

The following is a sample configuration for synchronizing data from Hive (Reader) to MaxCompute (Writer):

{
  "type": "job",
  "steps": [
    {
      "stepType": "hive",
      "parameter": {
        "useArrow": true,  
        "datasource": "my_datasource",
        "column": [
          "col1",
          "col2"
        ],
        "readMode": "hdfs",
        "table": "table"
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "odps",
      "parameter": {
        "useArrow": true,  
        "truncate": false,
        "datasource": "odps_test",
        "column": [
          "col1",
          "col2"
        ],
        "table": "table"
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "speed": {
      "concurrent": 3
    }
  }
}

Performance validation

The following sections show the performance improvement for several data source combinations. The test conditions for performance validation are as follows:

  • Stress test conditions: Server specifications: 64 cores, 256 GB of memory. Network interface card: 25 Gbit/s.

  • Test data volume: 42,045,700 records.

Data Source

Supported Feature

Replication Performance Improvement

MaxCompute

Directly read column-store data through the Tunnel Arrow API

Replication performance improved by 200%

Hologres

Supports Arrow format export to avoid Java Database Connectivity (JDBC) row-based bottlenecks

Replication performance improved by 95%

Hive, OSS, HDFS, and other distributed file systems

Directly read underlying Arrow format data from Parquet/ORC

PARQUET synchronization performance improved by 5.55 times. ORC synchronization performance improved by 9.85 times.

Scenario 1: MaxCompute column-store short-circuit synchronization (Arrow → Arrow)

Concurrency

Traditional Row Store

Arrow Column Store

Performance Improvement

1

67.8 MB/s

3740 R/s

212.6 MB/s

11462 R/s

+206.5%

3

185.6 MB/s

10226 R/s

569.9 MB/s

30728 R/s

+200.5%

8

462.1 MB/s

25467 R/s

1321.0 MB/s

71143 R/s

+197.4%

Scenario 2: Hologres → MaxCompute synchronization

Concurrency

Traditional Synchronization

Arrow Synchronization

Performance Improvement

4

439.1 MB/s

216480 R/s

906.1 MB/s

404270 R/s

+87%

8

773.3 MB/s

381300 R/s

1669.1 MB/s

745654 R/s

+95%

Scenario 3: Parquet/ORC → MaxCompute synchronization

Column Store Format

Traditional Synchronization

Arrow Synchronization

Performance Improvement

Parquet

26.1 MB/s

35631 R/s

1198.1 MB/s

233587 R/s

5.55 times

ORC

21.4 MB/s

27661 R/s

3256.3 MB/s

300326 R/s

9.85 times

Parquet and ORC are column-store formats used in distributed file systems such as HDFS and OSS.