Kafka自定义连接器可以从Kafka集群中提取数据或将Kafka数据推送到其他系统,本文介绍Kafka自定义连接器中的基本概念和连接配置。
基本概念
Connectors
Connectors位于逻辑抽象层,用于指定数据源及数据目标。负责将数据从源服务中复制到Kafka的Topic或将数据从Topic复制到目标服务。
Tasks
Tasks是无状态的逻辑执行单元,每个Connector实例会协同管理多个Tasks用于进行数据传输。
Workers
Workers是用于承载Connectors实例和Tasks线程运行的进程,支持两种运行模式:单机模式(Standalone mode)和分布式模式(Distributed mode),一个Worker进程中可以运行多个Connector实例线程和Task线程。
- Standalone mode:只有一个独立的Worker,所有工作都在此Worker中进行,不具备容错性。
- Distributed mode:具备可扩展性和自动容错能力,可以启动多个Worker,这些Worker采用同一个group.id,构成Worker集群,通过自动协调(类似Kafka的消费组中的协调均衡机制),在多个Worker之间调度执行Connectors和Tasks。如果增加Worker、关闭Worker或某个Worker进程意外失败,其他Workers将会检测到这种变化进行自动协调(Rebalance),重新分配Connectors以及Tasks。当Workers处于Distributed mode模式时,需要配置如下配置项:
- plugin.path:Kafka Connect框架层的配置项,主要在Kafka Connect启动时用于寻址Connectors可执行内容,这种可执行内容有如下两种形式。可配置多个路径,不同路径之间用逗号分隔。例如:
/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
其中,前一个目录包含Plugin及其第三方依赖所有类文件的uber JAR。后一个目录包含Plugin所需的所有JAR以及第三方依赖。
- group.id(默认connect-cluster):Connect Cluster Group使用的唯一名称,不能和Consumer Group ID(消费者组)冲突。
- config.storage.topic:用于存储Connector和任务配置的Topic,这是一个含有单个分区、多个副本的Topic,您需要手动创建此Topic,以确保是单个分区(自动创建可能会有多个分区)。
- offset.storage.topic(默认connect-offsets):用于存储offsets的Topic,此Topic应该配置多个分区和副本。
- status.storage.topic(默认connect-status):用于存储状态的Topic,此Topic可以有多个分区和副本。
- plugin.path:Kafka Connect框架层的配置项,主要在Kafka Connect启动时用于寻址Connectors可执行内容,这种可执行内容有如下两种形式。可配置多个路径,不同路径之间用逗号分隔。例如:
Converters
数据格式转换组件,用于在Kafka与外部服务之间进行消息数据的序列化以及反序列化操作,使得数据格式以及结构满足业务需求,Workers以及Connectors均可配置Converters,Connectors的Converter配置项可覆盖Workers的Converter配置项,一般包含Avro、Protobuf、String、JSON、JSON Schema以及ByteArray等转换格式。
连接配置
重要 控制台和ZIP文件同时指定连接配置时,控制台配置文件将覆盖ZIP配置文件内容。
参数(必填) | 说明 | 示例 |
---|---|---|
name | Connector的名称。一般命名为不包含ISO控制符的字符串。 | mongo-sink |
connector.class | Connector类的名称或者别名。必须是org.apache.kafka.connect.connector.Connector 的子类。 | com.mongodb.kafka.connect.MongoSinkConnector |
task.max | 最大任务数量。取值范围为[1,Kafka中Topic的最大分区数]。 | 1 |
topics | 当Kafka参数配置参数为Sink Connect时,该参数指定数据源Topic,不同Topic之间以半角逗号(,)进行分隔。 | sourceA,sourceB |
其他选填参数,请参见Kafka Connect Configs。
以JdbcSinkConnector为例,配置如下:
name=testConnector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=connect-test