Configure Kafka input components
Kafka input components read data from Kafka data sources into the storage system connected to the big data platform for data integration and secondary processing.
Prerequisites
Make sure that you have completed the following operations:
-
Created a Kafka data source. For more information, see Create a Kafka data source.
-
The account used to configure Kafka input component properties must have read-through permission on the data source. If you do not have the permission, you need to request the data source permission. For more information, see Request, renew, and return data source permissions.
Procedure
-
In the top navigation bar of the Dataphin homepage, choose Develop > Data Integration.
-
In the top navigation bar of the Integration page, select a project (In Dev-Prod mode, you need to select an environment).
-
In the left-side navigation pane, click Batch Pipeline. In the Batch Pipeline list, click the offline pipeline that you want to develop to open its configuration page.
-
Click Component Library in the upper-right corner of the page to open the Component Library panel.
-
In the left-side navigation pane of the Component Library panel, select Inputs. Find the KAFKA component in the input component list on the right and drag it to the canvas.
-
Click the
icon in the KAFKA input component card to open the KAFKA Input Configuration dialog box. -
In the KAFKA Input Configuration dialog box, configure the parameters as described in the following table.
Parameter
Description
Step Name
The name of the Kafka input component. Dataphin automatically generates a step name, which you can modify. The name must meet the following requirements:
-
It can contain only Chinese characters, letters, underscores (_), and digits.
-
It cannot exceed 64 characters in length.
Datasource
Lists all Kafka data sources in the current Dataphin instance, including those you have read-through permission for and those you do not. Click the
icon to copy the data source name.-
For data sources for which you do not have read-through permission, you can click Request next to the data source to request read-through permission. For more information, see Request data source permissions.
-
If you do not have a Kafka data source, click Create Data Source to create one. For more information, see Create a Kafka data source.
Topic
Select the Kafka topic from which you want to read data.
Key Type
The type of the Kafka key, which determines the key.deserializer configuration when initializing the Kafka Consumer. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, and KAFKA AVRO (available when schema.registry is configured for the data source).
Value Type
The type of the Kafka value, which determines the value.deserializer configuration when initializing the Kafka Consumer. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, and KAFKA AVRO (available when schema.registry is configured for the data source).
Consumer Group ID
Specifies the group.id when initializing the Kafka Consumer.
To consume data from the correct offset, set this parameter to a value unique to the data synchronization node. If left empty, a random string starting with
datax_is generated as the group.id each time synchronization runs.Start Time
The left boundary of the time range for reading data. Specify the value in yyyyMMddHHmmss format. Use this parameter with scheduling parameters. For example, if the scheduling parameter is
beginDateTime=${20220101000000}, set Start Time to ${beginDateTime}.End Time
The right boundary of the time range for reading data. Specify the value in yyyyMMddHHmmss format. Use this parameter with scheduling parameters. For example, if the scheduling parameter is
endDateTime=${20220101000000}, set End Time to ${endDateTime}.Synchronization End Strategy
The strategy that determines when the synchronization task ends:
-
When No New Data Is Read For 1 Minute: If the consumer does not retrieve any data from Kafka for 1 minute (usually because all data in the topic has been read, or possibly due to network or Kafka cluster availability issues), the task stops immediately. Otherwise, it continues to retry until data is read again.
-
When The Specified End Offset Is Reached: If the business time or offset of the Kafka record read by the data integration task meets the end offset configuration above, the task ends. Otherwise, it continues to retry reading Kafka records indefinitely.
Advanced Configuration
Configure the offset reset strategy, single read size, single read time, and read timeout. If schema registry is configured for the topic, also configure keySchema and valueSchema here. Empty by default. Sample format:
{ "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }Output Fields
By default, six fields are displayed: __key__ , __value__, __partition__, __headers__, __offset__, and __timestamp__. You can manually add output fields:
-
Click Batch Add to configure in bulk using JSON or TEXT format.
-
JSON format example
[ { "index": 0, "name": "__key__", "type": "STRING" }, { "index": 1, "name": "__value__", "type": "STRING" }, { "index": 2, "name": "__partition__", "type": "INTEGER" }, { "index": 3, "name": "__headers__", "type": "STRING" }, { "index": 4, "name": "__offset__", "type": "LONG" }, { "index": 5, "name": "__timestamp__", "type": "LONG" } ]Noteindexindicates the column number of the specified object,nameindicates the field name after import, andtypeindicates the field type after import. For example,"index":3,"name":"user_id","type":"String"means importing the 4th column from the file, with the field nameuser_idand field typeString. -
TEXT format example
0,__key__,STRING 1,__value__,STRING 2,__partition__,INTEGER 3,__headers__,STRING 4,__offset__,LONG 5,__timestamp__,LONGNote-
The row delimiter is used to separate the information of each field. The default is a line feed (\n). Line feed (\n), semicolon (;), and period (.) are supported.
-
The column delimiter is used to separate field names and field types. The default is a comma (,).
-
-
-
Click New Output Field, and fill in Source Index, Column, and select Type as prompted.
You can also configure source table fields as custom strings beyond the six defaults. In this case, the Kafka record value is parsed as JSON, and the configured string is used as a JSON path to extract the field value. For example:
If
{ "data": { "name": "bob", "age": 35 } }is the Kafka record value and the source table field is configured as data.name, "bob" is extracted and written to the corresponding target table field. Supported field types include Java types and datax mapping types.You can also perform the following operations on added fields:
-
Click the Actions
icon in the column to edit an existing field. -
Click the Actions
icon in the column to delete an existing field.
-
-
Click OK to complete the Kafka input component configuration.