Configure Kafka input components

更新时间: 2026-06-23 10:12:40

Kafka input components read data from Kafka data sources into the storage system connected to the big data platform for data integration and secondary processing.

Prerequisites

Make sure that you have completed the following operations:

Procedure

  1. In the top navigation bar of the Dataphin homepage, choose Develop > Data Integration.

  2. In the top navigation bar of the Integration page, select a project (In Dev-Prod mode, you need to select an environment).

  3. In the left-side navigation pane, click Batch Pipeline. In the Batch Pipeline list, click the offline pipeline that you want to develop to open its configuration page.

  4. Click Component Library in the upper-right corner of the page to open the Component Library panel.

  5. In the left-side navigation pane of the Component Library panel, select Inputs. Find the KAFKA component in the input component list on the right and drag it to the canvas.

  6. Click the image icon in the KAFKA input component card to open the KAFKA Input Configuration dialog box.

  7. In the KAFKA Input Configuration dialog box, configure the parameters as described in the following table.

    Parameter

    Description

    Step Name

    The name of the Kafka input component. Dataphin automatically generates a step name, which you can modify. The name must meet the following requirements:

    • It can contain only Chinese characters, letters, underscores (_), and digits.

    • It cannot exceed 64 characters in length.

    Datasource

    Lists all Kafka data sources in the current Dataphin instance, including those you have read-through permission for and those you do not. Click the image icon to copy the data source name.

    • For data sources for which you do not have read-through permission, you can click Request next to the data source to request read-through permission. For more information, see Request data source permissions.

    • If you do not have a Kafka data source, click Create Data Source to create one. For more information, see Create a Kafka data source.

    Topic

    Select the Kafka topic from which you want to read data.

    Key Type

    The type of the Kafka key, which determines the key.deserializer configuration when initializing the Kafka Consumer. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, and KAFKA AVRO (available when schema.registry is configured for the data source).

    Value Type

    The type of the Kafka value, which determines the value.deserializer configuration when initializing the Kafka Consumer. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, and KAFKA AVRO (available when schema.registry is configured for the data source).

    Consumer Group ID

    Specifies the group.id when initializing the Kafka Consumer.

    To consume data from the correct offset, set this parameter to a value unique to the data synchronization node. If left empty, a random string starting with datax_ is generated as the group.id each time synchronization runs.

    Start Time

    The left boundary of the time range for reading data. Specify the value in yyyyMMddHHmmss format. Use this parameter with scheduling parameters. For example, if the scheduling parameter is beginDateTime=${20220101000000}, set Start Time to ${beginDateTime}.

    End Time

    The right boundary of the time range for reading data. Specify the value in yyyyMMddHHmmss format. Use this parameter with scheduling parameters. For example, if the scheduling parameter is endDateTime=${20220101000000}, set End Time to ${endDateTime}.

    Synchronization End Strategy

    The strategy that determines when the synchronization task ends:

    • When No New Data Is Read For 1 Minute: If the consumer does not retrieve any data from Kafka for 1 minute (usually because all data in the topic has been read, or possibly due to network or Kafka cluster availability issues), the task stops immediately. Otherwise, it continues to retry until data is read again.

    • When The Specified End Offset Is Reached: If the business time or offset of the Kafka record read by the data integration task meets the end offset configuration above, the task ends. Otherwise, it continues to retry reading Kafka records indefinitely.

    Advanced Configuration

    Configure the offset reset strategy, single read size, single read time, and read timeout. If schema registry is configured for the topic, also configure keySchema and valueSchema here. Empty by default. Sample format:

    {
     "namespace": "example.avro",
     "type": "record",
     "name": "User",
     "fields": [
         {"name": "name", "type": "string"},
         {"name": "favorite_number",  "type": ["int", "null"]},
         {"name": "favorite_color", "type": ["string", "null"]}
     ]
    }

    Output Fields

    By default, six fields are displayed: __key__ , __value__, __partition__, __headers__, __offset__, and __timestamp__. You can manually add output fields:

    • Click Batch Add to configure in bulk using JSON or TEXT format.

      • JSON format example

        [
            {
                "index": 0,
                "name": "__key__",
                "type": "STRING"
            },
            {
                "index": 1,
                "name": "__value__",
                "type": "STRING"
            },
            {
                "index": 2,
                "name": "__partition__",
                "type": "INTEGER"
            },
            {
                "index": 3,
                "name": "__headers__",
                "type": "STRING"
            },
            {
                "index": 4,
                "name": "__offset__",
                "type": "LONG"
            },
            {
                "index": 5,
                "name": "__timestamp__",
                "type": "LONG"
            }
        ]
        Note

        index indicates the column number of the specified object, name indicates the field name after import, and type indicates the field type after import. For example, "index":3,"name":"user_id","type":"String" means importing the 4th column from the file, with the field name user_id and field type String.

      • TEXT format example

        0,__key__,STRING
        1,__value__,STRING
        2,__partition__,INTEGER
        3,__headers__,STRING
        4,__offset__,LONG
        5,__timestamp__,LONG
        Note
        • The row delimiter is used to separate the information of each field. The default is a line feed (\n). Line feed (\n), semicolon (;), and period (.) are supported.

        • The column delimiter is used to separate field names and field types. The default is a comma (,).

    • Click New Output Field, and fill in Source Index, Column, and select Type as prompted.

    You can also configure source table fields as custom strings beyond the six defaults. In this case, the Kafka record value is parsed as JSON, and the configured string is used as a JSON path to extract the field value. For example:

    If { "data": { "name": "bob", "age": 35 } } is the Kafka record value and the source table field is configured as data.name, "bob" is extracted and written to the corresponding target table field. Supported field types include Java types and datax mapping types.

    You can also perform the following operations on added fields:

    • Click the Actionsagag icon in the column to edit an existing field.

    • Click the Actionsagfag icon in the column to delete an existing field.

  8. Click OK to complete the Kafka input component configuration.

上一篇: Configure TDH Inceptor Input Component 下一篇: Configure DataHub Input Components
阿里云首页 智能数据建设与治理 Dataphin 相关技术圈