You can use a filter condition in a batch synchronization node to synchronize either full data or incremental data. With a filter condition, Data Integration synchronizes only data that meets the specified criteria. You can also combine scheduling parameters with the filter condition to dynamically filter data based on the node's runtime, enabling incremental synchronization. This topic shows you how to configure a batch synchronization node for incremental synchronization.
Usage notes
Incremental synchronization is not supported for some data sources, such as HBase and OTSStream. To check if incremental synchronization is supported for a specific data source, see the documentation for the corresponding reader plug-in.
The required parameters for incremental synchronization vary by reader plug-in. For details, see the documentation for the specific plug-in and Supported data sources and plug-ins. For example:
Reader plug-in
Required parameter
Supported syntax
where
NoteIn the codeless UI, this is the filter condition parameter.
Database syntax
NoteYou can use this parameter with scheduling parameters to read data from a specified time range each day.
query
NoteIn the codeless UI, this is the Search Condition parameter.
Similar to database syntax
NoteYou can use this parameter with scheduling parameters to read data from a specified time range each day.
Object
Specify the object path
NoteYou can use this parameter with scheduling parameters to read data from a specified file each day.
...
...
...
Configure incremental synchronization
In a Data Integration batch synchronization node, you can use scheduling parameters to specify data paths and ranges for the source and destination tables. The configuration is the same as for other node types.
At runtime, the system replaces placeholder parameters with the actual values from the scheduling parameter expressions.
For example, to synchronize data from a MySQL data source:
If you do not configure a Data Filtering, the node synchronizes the full data to the destination table by default.
If you configure a Data Filtering, the node synchronizes only the data that meets the condition to the destination table.
The partition name of the destination MaxCompute table is specified by using scheduling parameters. The $bizdate parameter represents the data timestamp. When a scheduled task runs, the partition expression in the task configuration is replaced with the data timestamp. For details about how to configure scheduling parameter expressions, see Configure and use scheduling parameters. For a batch synchronization node, you must configure the bizdate parameter in three locations to implement incremental synchronization: In the Data Filtering field for the source, enter STR_TO_DATE('${bizdate}','%Y%m%d') <= gmt_modify_time AND gmt_modify_time < DATE_ADD(STR_TO_DATE('${bizdate}','%Y%m%d'), interval 1 day) to filter data modified on the current day based on the data timestamp. In the Partition Information field for the destination, enter pt=${bizdate} to write data to the corresponding date partition, and set the Cleanup Rule to Insert Overwrite. In the Parameters field within the Scheduling pane on the right, enter bizdate=$bizdate. This allows the scheduling system to automatically replace ${bizdate} with the actual data timestamp at runtime. When you configure incremental data synchronization:
For incremental fields of a time or date data type, you can use scheduling parameters to dynamically substitute values. At runtime, the system automatically replaces the parameters with specific values based on the data timestamp. For more information, see Scheduling parameter formats.
For non-time-based incremental fields: You can use an assignment node to convert the field to the target data type and then pass it to Data Integration for data synchronization. For more information, see Assignment node.
Notes
When you configure an incremental synchronization task, take note of the following:
Safety of Insert Overwrite: Using the Insert Overwrite mode is safe. This mode clears only the partition specified by the current task. It does not affect data in other partitions and does not cause data conflicts or accidental deletions.
Limitation on bulk overwrite for partition ranges: DataWorks does not support setting an hour range value, such as
hh=00-23, in the partition configuration for bulk overwrite operations. To overwrite data for multiple hours, you must configure and run separate tasks, as partition parameters only support single values or the wildcard character*.Wildcard syntax format: In scenarios where the source contains hourly partitions but the destination is partitioned only by day, you must enter the wildcard character
*in the hour partition field to match all hourly data. Enter*directly. Do not add quotation marks, such as"*", which will cause a syntax error.
High-frequency, timestamp-based incremental synchronization
DataWorks supports timestamp-based incremental synchronization through batch synchronization nodes combined with periodic scheduling, such as every 5 minutes or every hour. This method is suitable for T+1 or near-real-time synchronization scenarios from sources like ApsaraDB RDS for MySQL to destinations like SelectDB or StarRocks. This approach uses SQL filtering for incremental loads and avoids the cost of running continuous real-time CDC tasks. Key configuration points:
In the
whereclause of the source's Data Filtering section, use a timestamp field as a variable for filtering. For example:gmt_modify_time >= '$[yyyymmddhhmiss-10/mi]' AND gmt_modify_time < '$[yyyymmddhhmiss]'.Configure periodic scheduling parameters, such as
$[yyyymmddhhmiss], to dynamically calculate the time range. This ensures each scheduled run synchronizes only the incremental data within the specified interval. For more information about scheduling parameter configurations, see Use cases of scheduling parameters in Data Integration.In the destination field mapping, manually add a constant parameter mapped to the partition field to enable dynamic partition writing.
Incremental configuration for whole-database batch synchronization
In addition to single-table synchronization tasks, you can create a whole-database batch synchronization task to achieve periodic incremental synchronization. When you create the task, select incremental synchronization and configure an incremental condition in the task settings. This allows for efficient daily incremental partition synchronization, for example, by filtering based on the create_time field.
This method is suitable for scenarios where you want to centrally manage synchronization for multiple tables but perform incremental updates only on specific tables. For the complete configuration process of a whole-database batch synchronization task, see Configure a whole-database batch synchronization task.
Use cases
Synchronize historical data: To synchronize historical incremental data into the corresponding time-based partitions in a destination table, you can use the data backfill feature in Operation Center. For details on how to use this feature, see Manage data backfill instances. In the node configuration, select MySQL as the data source and MaxCompute as the destination, with a table name such as
czd. In the Data Filtering condition, use${bizdate}to control the incremental range, for example,STR_TO_DATE('${bizdate}','%Y%m%d') <= gmt_modify_time. Set the Partition Information tods=${bizdate}and choose Insert Overwrite as the cleanup rule. In the Parameters section of the scheduling configuration, definebizdate=$bizdate. In a data backfill scenario, the scheduling system automatically replaces this parameter with the corresponding dates from the data timestamp. When you execute the data backfill, you can set multiple data timestamp ranges, such as from 2022-05-01 to 2022-05-31 and from 2022-04-01 to 2022-04-30. Select Run Backfill Instances with Schedule Time Later Than Current Time Immediately and select Ascending Order of Data Timestamp for the execution order.Incrementally synchronize data from ApsaraDB RDS to MaxCompute
FAQ
MaxCompute Reader: "partition does not exist" error
Cause: The configured scheduling parameter was not correctly parsed into an actual partition value at runtime, or the parsed value does not match an actual partition in the source table.
Solution: If the partition value is passed from an upstream node's outputs parameter, check the parameter configuration in DataStudio and ensure the following:
Ensure that the configured parameter name matches the input parameter name.
Ensure that the passed parameter value exactly matches an existing partition in MaxCompute.
Default sync behavior and non-partitioned tables
Default behavior
By default, a DataWorks Data Integration batch synchronization task performs a full synchronization. To achieve incremental synchronization, you must configure a Data Filtering condition and combine it with scheduling parameters.
Handling source tables without partition fields
If a source database table, such as a table in ApsaraDB RDS, does not have a time or partition field, you cannot directly filter incremental data by using a WHERE clause. We recommend that you add a time field, such as dt or gmt_modify_time, to the source table to use as the basis for incremental filtering. After you prepare the field, refer to the 'Configure incremental synchronization' section of this document to configure the incremental synchronization logic.
splitPk and splitFactor: Purpose and impact
Parameter definitions
splitPk(splitting primary key): Specifies a primary key column. DataWorks splits the data into multiple ranges based on the value of this field to enable multi-threaded, parallel reads.splitFactor: Controls the splitting granularity. A larger value results in finer splits and more read threads.
Performance impact and recommendations
Enabling splitPk and splitFactor increases the load on the source database. To mitigate this load, we recommend that you:
Reduce the concurrency, or set the concurrency for the whole-database batch task to 1.
Ensure that the splitting column (
splitPk) is indexed to improve read efficiency.