本文介绍自定义连接器中Workers的概念及配置。
背景信息
Workers是运行连接器逻辑的Java虚拟机 (JVM) 进程。每个Worker创建一组并行线程中的Tasks,并完成复制数据的工作。
Workers中的Tasks不存储状态,可以随时启动、停止或重新启动。SAE将提供弹性和可扩展的数据管道,通过CPU或者Memory水位阈值判断并在指定范围内自动进行弹性扩缩,满足Workers的弹性诉求。
Workers配置总览
Workers配置参数与开源Kafka Connect的配置参数兼容,配置全集请参见Confluent Kafka Connect配置。
默认Workers配置
云消息队列 Kafka 版提供了对Confluent Kafka Connect进行半托管的一站式平台,提供如下默认配置:
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true
offset.flush.interval.ms=60000
request.timeout.ms=40000
task.shutdown.graceful.timeout.ms=10000
plugin.path=/opt/kafka/connect/plugins
rest.advertised.port=8083
topic.creation.enable=false
listeners=http://:8083
可自定义的Workers配置
创建任务时允许您自定义以下参数值,这些自定义配置将覆盖云消息队列 Kafka 版提供的默认配置。
- 必填参数配置项(控制台会预设必填配置项):
配置项 说明 示例 bootstrap.servers Kafka实例的接入点。用于与Kafka实例相连接。 alikafka-post-cn-7mz301t5****.alikafka.aliyuncs.com:9092 offset.storage.topic 存储offsets信息的Topic名称。 topic_offset config.storage.topic 存储配置信息的Topic名称。 topic_config status.storage.topic 存储状态信息的Topic名称。 topic_status group.id 标识此Worker所属的Connect集群。 test 控制台预设默认配置值:group.id=connect-eb-cluster-35345 offset.storage.topic=connect-eb-offset-35345 config.storage.topic=connect-eb-config-35345 status.storage.topic=connect-eb-status-35345 consumer.group.id=connector-eb-cluster-mongo-sink bootstrap.servers=alikafka-pre-cn-zpr3156gn006-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-3-vpc.alikafka.aliyuncs.com:9092
- 选填参数配置项:
key.converter key.converter.schemas.enable value.converter value.converter.schemas.enable exactly.once.source.support heartbeat.interval.ms rebalance.timeout.ms session.timeout.ms client.dns.lookup connections.max.idle.ms connector.client.config.override.policy receive.buffer.bytes request.timeout.ms send.buffer.bytes worker.sync.timeout.ms worker.unsync.backoff.ms access.control.allow.methods access.control.allow.origin admin.listeners client.id config.providers connect.protocol header.converter metadata.max.age.ms offset.flush.interval.ms offset.flush.timeout.ms reconnect.backoff.max.ms reconnect.backoff.ms retry.backoff.ms scheduled.rebalance.max.delay.ms task.shutdown.graceful.timeout.ms topic.tracking.allow.reset topic.tracking.enable
不可自定义的Workers配置
以下配置项不支持自定义设置。
- 使用云消息队列 Kafka 版提供默认值的配置项:
plugin.path rest.advertised.port topic.creation.enable listeners
- 不会传输至Kafka Connect的配置项。
sasl.* ssl.* security.* rest.advertised.host.name rest.advertised.listener rest.extension.classes client.* inter.worker.* metrics.* metrics.context.* response.http.headers.config socket.*