Kafka插件基于Kafka SDK实时读取Kafka数据。

操作步骤

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
  2. 鼠标悬停至新建图标,单击数据集成 > 实时同步
    您也可以找到相应的业务流程,右键单击数据集成,选择新建 > 实时同步
  3. 新建节点对话框中,输入节点名称,并选择目标文件夹
    注意 节点名称必须是大小写字母、中文、数字、下划线(_)以及小数点(.),且不能超过128个字符。
  4. 单击提交
  5. 在实时同步节点的编辑页面,鼠标单击输入 > Kafka并拖拽至编辑面板。
  6. 单击Kafka节点,在节点配置对话框中,配置各项参数。
    Kafka
    参数 描述
    server Kafka的Broker Server地址,格式为ip:port
    topic Kafka的Topic名称,是Kafka处理资源的消息源的不同分类。
    每条发布至Kafka集群的消息都有一个类别,该类别被称为Topic,一个Topic是对一组消息的归纳。
    说明 一个Kafka输入仅支持一个Topic。
    keyType Kafka的Key的类型。
    valueType Kafka的Value的类型。
    启动位点 需要开始同步数据的起始时间。
    配置参数 创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.serversauto.commit.interval.mssession.timeout.ms等,您可以基于KafkaConfig控制KafkaConsumer消费数据的行为。
    输出字段 您可以自定义Kafka数据对外输出的字段名:
    • 单击添加更多字段,输入字段名,并选择类型,即可新增自定义字段。
      说明 Kafka的输出字段默认为JSON格式,您无需设置取值方式

      如果整个字段取Kafka消息的值,可以输入"__value__"

      您可以通过.(获取子字段)和[](获取数组元素)两种语法,获取复杂JSON格式的内容。Kafka的数据示例如下。
      {
            "a": {
            "a1": "hello"
            },
            "b": "world",
            "c":[
                  "xxxxxxx",
                  "yyyyyyy"
                  ],
            "d":[
                  {
                        "AA":"this",
                        "BB":"is_data"
                  },
                  {
                        "AA":"that",
                        "BB":"is_also_data"
                  }
              ]
      }
      不同情况下,输出字段的取值为:
      • 如果使用整个JSON,输出字段为__value__
      • 如果同步a1的数据"hello",输出字段为a.a1
      • 如果同步b的数据"world,输出字段为b
      • 如果同步c的数据"yyyyyyy",输出字段为c[1]
      • 如果同步AA的数据"this",输出字段为d[0].AA
    • 鼠标悬停至相应字段,单击显示的删除图标,即可删除该字段。
  7. 单击工具栏中的保存图标。