Kafka插件基于Kafka SDK实时读取Kafka数据。

新建Kafka输入

  1. 登录DataWorks控制台,单击相应工作空间后的进入数据开发
  2. 鼠标悬停至新建,单击数据集成 > 实时同步

    您也可以找到相应的业务流程,右键单击数据集成,选择新建 > 实时同步

  3. 新建节点对话框中,输入节点名称,并选择目标文件夹,单击提交
  4. 在实时同步节点的编辑页面,鼠标单击输入 > Kafka并拖拽至编辑面板。
  5. 单击Kafka节点,填写节点配置对话框中的参数。Kafka
    参数 描述
    server Kafka的broker server地址,格式为ip:port
    topic Kafka的topic名称,是Kafka处理资源的消息源(feeds of messages)的不同分类。

    每条发布至Kafka集群的消息都有一个类别,该类别被称为topic,一个topic是对一组消息的归纳。

    keyType Kafka的Key的类型。
    valueType Kafka的value的类型。
    启动位点 需要开始同步数据的起始时间。
    配置参数 创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.serversauto.commit.interval.mssession.timeout.ms等,您可以基于KafkaConfig控制KafkaConsumer消费数据的行为。
    启动时间点位 选择启动时间点位的日期和时间。
    时区 选择相应的时区。
    输出字段 您可以自定义Kafka数据对外输出的字段名。
  6. 单击工具栏中的保存