Kafka自定义连接器可以从Kafka集群中提取数据或将Kafka数据推送到其他系统,本文介绍Kafka自定义连接器中的基本概念和连接配置。

基本概念

Connectors

Connectors位于逻辑抽象层,用于指定数据源及数据目标。负责将数据从源服务中复制到Kafka的Topic或将数据从Topic复制到目标服务。

Tasks

Tasks是无状态的逻辑执行单元,每个Connector实例会协同管理多个Tasks用于进行数据传输。

Workers

Workers是用于承载Connectors实例和Tasks线程运行的进程,支持两种运行模式:单机模式(Standalone mode)和分布式模式(Distributed mode),一个Worker进程中可以运行多个Connector实例线程和Task线程。
  • Standalone mode:只有一个独立的Worker,所有工作都在此Worker中进行,不具备容错性。
  • Distributed mode:具备可扩展性和自动容错能力,可以启动多个Worker,这些Worker采用同一个group.id,构成Worker集群,通过自动协调(类似Kafka的消费组中的协调均衡机制),在多个Worker之间调度执行Connectors和Tasks。如果增加Worker、关闭Worker或某个Worker进程意外失败,其他Workers将会检测到这种变化进行自动协调(Rebalance),重新分配Connectors以及Tasks。
    当Workers处于Distributed mode模式时,需要配置如下配置项:
    • plugin.path:Kafka Connect框架层的配置项,主要在Kafka Connect启动时用于寻址Connectors可执行内容,这种可执行内容有如下两种形式。可配置多个路径,不同路径之间用逗号分隔。例如:
      /usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors

      其中,前一个目录包含Plugin及其第三方依赖所有类文件的uber JAR。后一个目录包含Plugin所需的所有JAR以及第三方依赖。

    • group.id(默认connect-cluster):Connect Cluster Group使用的唯一名称,不能和Consumer Group ID(消费者组)冲突。
    • config.storage.topic:用于存储Connector和任务配置的Topic,这是一个含有单个分区、多个副本的Topic,您需要手动创建此Topic,以确保是单个分区(自动创建可能会有多个分区)。
    • offset.storage.topic(默认connect-offsets):用于存储offsets的Topic,此Topic应该配置多个分区和副本。
    • status.storage.topic(默认connect-status):用于存储状态的Topic,此Topic可以有多个分区和副本。

Converters

数据格式转换组件,用于在Kafka与外部服务之间进行消息数据的序列化以及反序列化操作,使得数据格式以及结构满足业务需求,Workers以及Connectors均可配置Converters,Connectors的Converter配置项可覆盖Workers的Converter配置项,一般包含Avro、Protobuf、String、JSON、JSON Schema以及ByteArray等转换格式。

连接配置

重要 控制台和ZIP文件同时指定连接配置时,控制台配置文件将覆盖ZIP配置文件内容。
参数(必填)说明示例
nameConnector的名称。一般命名为不包含ISO控制符的字符串。mongo-sink
connector.classConnector类的名称或者别名。必须是org.apache.kafka.connect.connector.Connector的子类。com.mongodb.kafka.connect.MongoSinkConnector
task.max最大任务数量。取值范围为[1,Kafka中Topic的最大分区数]。1
topicsKafka参数配置参数为Sink Connect时,该参数指定数据源Topic,不同Topic之间以半角逗号(,)进行分隔。sourceA,sourceB

其他选填参数,请参见Kafka Connect Configs

以JdbcSinkConnector为例,配置如下:
name=testConnector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=connect-test