Synchronize Kafka data using APS
AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS). You can use APS to perform real-time data ingestion from a specified start offset. This topic describes how to add a Kafka data source, create a Kafka sync link, and start the synchronization task.
Prerequisites
-
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user.
-
You have an ApsaraMQ for Kafka instance, referred to as Kafka in this topic, deployed in the same VPC as your AnalyticDB for MySQL cluster.
-
You have created a Kafka topic and sent messages to it. For more information, see ApsaraMQ for Kafka quick start.
Limitations
-
Only Kafka data in JSON format is supported.
-
Data in a Kafka topic is automatically deleted when its retention period expires. If a data synchronization job fails, data may be deleted from the topic before the job is restarted, causing data loss. To prevent this, increase the topic's data retention period and contact technical support immediately if a job fails.
-
The Kafka API truncates sample data from Kafka that exceeds 8 KB. This causes JSON parsing to fail, preventing the system from automatically generating the field mapping.
-
Changes to the Kafka source table schema do not trigger automatic DDL updates and are not synchronized to AnalyticDB for MySQL.
Billing
You are charged for data synchronization tasks based on the elastic ACU resources they consume from a AnalyticDB for MySQL. For more information about pricing, see Billing for Data Lakehouse Edition and Billing for Enterprise and Basic editions.
Procedure
Step 1: Create a data source
If you have already added a Kafka data source, skip this step and proceed to Step 2: Create a sync link.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.
-
In the left-side navigation pane, choose Data Ingestion>Data Sources.
-
In the upper-left corner, click Create Data Source.
-
On the Create Data Source page, configure the parameters as described in the following table.
Parameter
Description
Data Source Type
Select Kafka.
Data Source Name
The system automatically generates a name based on the data source type and current time. You can modify the name as needed.
Data Source Description
Enter a description for the data source, such as its use case or business scope.
Deployment Mode
Only Alibaba Cloud instances are supported.
Kafka Instance
The ID of the Kafka instance.
Log on to the ApsaraMQ for Kafka console and find the instance ID on the Clusters page.
Kafka Topic
The name of the topic that you created in Kafka.
Log on to the ApsaraMQ for Kafka console and find the topic name on the Topics page of the target instance.
Message Data Format
The format of messages in Kafka. Only JSON is supported.
-
After you configure the parameters, click Create.
Step 2: Create a sync link
-
In the left-side navigation pane, click Simple Log Service/Kafka Data Synchronization.
-
In the upper-left corner, click Create Synchronization Job, and then click the Kafka Data Source tab.
-
On the Create Synchronization Job page, configure the parameters in the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings sections.
-
Parameters in the Source and Destination Settings section:
Parameter
Description
Job Name
The name of the sync link. The system automatically generates a name based on the data source type and current time. You can modify the name as needed.
Data Source
Select an existing Kafka data source or create a new one.
Data Source Format
Only JSON is supported.
Destination Type
Select Data Warehouse - ADB Storage.
AnalyticDB for MySQL Account
The database account for the AnalyticDB for MySQL cluster.
AnalyticDB for MySQL Password
The password for the database account of the AnalyticDB for MySQL cluster.
-
Parameters in the Destination Database and Table Settings section:
Parameter
Description
Database Name
The name of the database in the AnalyticDB for MySQL cluster.
Table Name
The name of the table in the AnalyticDB for MySQL cluster.
Sample Data
The system automatically retrieves the latest data from the Kafka topic to use as sample data.
NoteThe data in the Kafka topic must be in JSON format. If the topic contains data in other formats, an error occurs during data synchronization.
Parsed JSON Layers
The number of nested layers to parse in the JSON data. Valid values:
-
0: No parsing is performed.
-
1 (Default): One layer is parsed.
-
2: Two layers are parsed.
-
3: Three layers are parsed.
-
4: Four layers are parsed.
For more information about the JSON parsing policy, see Use APS to synchronize Kafka data (Recommended).
Schema Field Mapping
The system displays the schema information of the sample data after JSON parsing. You can modify the names and data types of destination fields, and add or delete fields as needed.
-
-
Parameters in the Synchronization Settings section:
Parameter
Description
Start offset
The point in time from which the task starts consuming Kafka data. The system begins with the first record whose timestamp is greater than or equal to the time you specify.
Dirty Data Processing Mode
A synchronization error occurs if the data type of a destination field does not match the data type of the corresponding source data from Kafka. For example, if the source data is
abcand the destination field is of theINTtype, an error occurs because the data cannot be converted.The following dirty data processing modes are supported:
-
Stop Synchronization (Default): The synchronization task stops. You must modify the data type of the destination field or change the dirty data processing mode, and then restart the task.
-
Process as NULL: APS writes the field with dirty data as NULL to the destination table.
For example, in a row of Kafka data with three fields (col1, col2, and col3), if col2 contains dirty data, it is converted to NULL before being written, while col1 and col3 are written as is.
Job Resource Group
Specify the job resource group in which the task runs.
ACUs for Incremental Synchronization
The number of ACUs from the job resource group that the task uses. The minimum value is 2, and the maximum value is the number of available compute resources in the job resource group. We recommend that you increase the number of ACUs to improve ingestion performance and task stability.
NoteA data synchronization task occupies elastic resources from the specified job resource group long-term. Therefore, these resources are deducted from the available pool in the resource group. For example, if a job resource group has 48 ACUs and you create a synchronization task that uses 8 ACUs, only 40 ACUs remain available for other tasks in that resource group.
Add to Whitelist
Select this option to add the CIDR block of the Kafka vSwitch to the AnalyticDB for MySQL cluster's whitelist. This action enables network connectivity for data synchronization.
-
-
-
After you configure the parameters, click Submit.
Step 3: Start the data synchronization task
-
On the Simple Log Service/Kafka Data Synchronization page, find your data synchronization task and click Start in the Actions column.
-
Click Search in the upper-left corner. The task starts successfully when its status changes to Running.