Synchronize Kafka data using APS

更新时间:
复制 MD 格式

AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS). You can use APS to perform real-time data ingestion from a specified start offset. This topic describes how to add a Kafka data source, create a Kafka sync link, and start the synchronization task.

Prerequisites

Limitations

  • Only Kafka data in JSON format is supported.

  • Data in a Kafka topic is automatically deleted when its retention period expires. If a data synchronization job fails, data may be deleted from the topic before the job is restarted, causing data loss. To prevent this, increase the topic's data retention period and contact technical support immediately if a job fails.

  • The Kafka API truncates sample data from Kafka that exceeds 8 KB. This causes JSON parsing to fail, preventing the system from automatically generating the field mapping.

  • Changes to the Kafka source table schema do not trigger automatic DDL updates and are not synchronized to AnalyticDB for MySQL.

Billing

You are charged for data synchronization tasks based on the elastic ACU resources they consume from a AnalyticDB for MySQL. For more information about pricing, see Billing for Data Lakehouse Edition and Billing for Enterprise and Basic editions.

Procedure

Step 1: Create a data source

Note

If you have already added a Kafka data source, skip this step and proceed to Step 2: Create a sync link.

  1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.

  2. In the left-side navigation pane, choose Data Ingestion>Data Sources.

  3. In the upper-left corner, click Create Data Source.

  4. On the Create Data Source page, configure the parameters as described in the following table.

    Parameter

    Description

    Data Source Type

    Select Kafka.

    Data Source Name

    The system automatically generates a name based on the data source type and current time. You can modify the name as needed.

    Data Source Description

    Enter a description for the data source, such as its use case or business scope.

    Deployment Mode

    Only Alibaba Cloud instances are supported.

    Kafka Instance

    The ID of the Kafka instance.

    Log on to the ApsaraMQ for Kafka console and find the instance ID on the Clusters page.

    Kafka Topic

    The name of the topic that you created in Kafka.

    Log on to the ApsaraMQ for Kafka console and find the topic name on the Topics page of the target instance.

    Message Data Format

    The format of messages in Kafka. Only JSON is supported.

  5. After you configure the parameters, click Create.

Step 2: Create a sync link

  1. In the left-side navigation pane, click Simple Log Service/Kafka Data Synchronization.

  2. In the upper-left corner, click Create Synchronization Job, and then click the Kafka Data Source tab.

  3. On the Create Synchronization Job page, configure the parameters in the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings sections.

    • Parameters in the Source and Destination Settings section:

      Parameter

      Description

      Job Name

      The name of the sync link. The system automatically generates a name based on the data source type and current time. You can modify the name as needed.

      Data Source

      Select an existing Kafka data source or create a new one.

      Data Source Format

      Only JSON is supported.

      Destination Type

      Select Data Warehouse - ADB Storage.

      AnalyticDB for MySQL Account

      The database account for the AnalyticDB for MySQL cluster.

      AnalyticDB for MySQL Password

      The password for the database account of the AnalyticDB for MySQL cluster.

    • Parameters in the Destination Database and Table Settings section:

      Parameter

      Description

      Database Name

      The name of the database in the AnalyticDB for MySQL cluster.

      Table Name

      The name of the table in the AnalyticDB for MySQL cluster.

      Sample Data

      The system automatically retrieves the latest data from the Kafka topic to use as sample data.

      Note

      The data in the Kafka topic must be in JSON format. If the topic contains data in other formats, an error occurs during data synchronization.

      Parsed JSON Layers

      The number of nested layers to parse in the JSON data. Valid values:

      • 0: No parsing is performed.

      • 1 (Default): One layer is parsed.

      • 2: Two layers are parsed.

      • 3: Three layers are parsed.

      • 4: Four layers are parsed.

      For more information about the JSON parsing policy, see Use APS to synchronize Kafka data (Recommended).

      Schema Field Mapping

      The system displays the schema information of the sample data after JSON parsing. You can modify the names and data types of destination fields, and add or delete fields as needed.

    • Parameters in the Synchronization Settings section:

      Parameter

      Description

      Start offset

      The point in time from which the task starts consuming Kafka data. The system begins with the first record whose timestamp is greater than or equal to the time you specify.

      Dirty Data Processing Mode

      A synchronization error occurs if the data type of a destination field does not match the data type of the corresponding source data from Kafka. For example, if the source data is abc and the destination field is of the INT type, an error occurs because the data cannot be converted.

      The following dirty data processing modes are supported:

      • Stop Synchronization (Default): The synchronization task stops. You must modify the data type of the destination field or change the dirty data processing mode, and then restart the task.

      • Process as NULL: APS writes the field with dirty data as NULL to the destination table.

      For example, in a row of Kafka data with three fields (col1, col2, and col3), if col2 contains dirty data, it is converted to NULL before being written, while col1 and col3 are written as is.

      Job Resource Group

      Specify the job resource group in which the task runs.

      ACUs for Incremental Synchronization

      The number of ACUs from the job resource group that the task uses. The minimum value is 2, and the maximum value is the number of available compute resources in the job resource group. We recommend that you increase the number of ACUs to improve ingestion performance and task stability.

      Note

      A data synchronization task occupies elastic resources from the specified job resource group long-term. Therefore, these resources are deducted from the available pool in the resource group. For example, if a job resource group has 48 ACUs and you create a synchronization task that uses 8 ACUs, only 40 ACUs remain available for other tasks in that resource group.

      Add to Whitelist

      Select this option to add the CIDR block of the Kafka vSwitch to the AnalyticDB for MySQL cluster's whitelist. This action enables network connectivity for data synchronization.

  4. After you configure the parameters, click Submit.

Step 3: Start the data synchronization task

  1. On the Simple Log Service/Kafka Data Synchronization page, find your data synchronization task and click Start in the Actions column.

  2. Click Search in the upper-left corner. The task starts successfully when its status changes to Running.