本文介绍自定义连接器中Workers的概念及配置。

背景信息

Workers是运行连接器逻辑的Java虚拟机 (JVM) 进程。每个Worker创建一组并行线程中的Tasks,并完成复制数据的工作。

Workers中的Tasks不存储状态,可以随时启动、停止或重新启动。SAE将提供弹性和可扩展的数据管道,通过CPU或者Memory水位阈值判断并在指定范围内自动进行弹性扩缩,满足Workers的弹性诉求。

Worker

Workers配置总览

Workers配置参数与开源Kafka Connect的配置参数兼容,配置全集请参见Confluent Kafka Connect配置

默认Workers配置

云消息队列 Kafka 版提供了对Confluent Kafka Connect进行半托管的一站式平台,提供如下默认配置:
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true

offset.flush.interval.ms=60000
request.timeout.ms=40000
task.shutdown.graceful.timeout.ms=10000

plugin.path=/opt/kafka/connect/plugins
rest.advertised.port=8083
topic.creation.enable=false
listeners=http://:8083

可自定义的Workers配置

创建任务时允许您自定义以下参数值,这些自定义配置将覆盖云消息队列 Kafka 版提供的默认配置。

  • 必填参数配置项(控制台会预设必填配置项):
    配置项说明示例
    bootstrap.serversKafka实例的接入点。用于与Kafka实例相连接。alikafka-post-cn-7mz301t5****.alikafka.aliyuncs.com:9092
    offset.storage.topic存储offsets信息的Topic名称。topic_offset
    config.storage.topic存储配置信息的Topic名称。topic_config
    status.storage.topic存储状态信息的Topic名称。topic_status
    group.id标识此Worker所属的Connect集群。test
    控制台预设默认配置值:
    group.id=connect-eb-cluster-35345
    offset.storage.topic=connect-eb-offset-35345
    config.storage.topic=connect-eb-config-35345
    status.storage.topic=connect-eb-status-35345
    consumer.group.id=connector-eb-cluster-mongo-sink
    bootstrap.servers=alikafka-pre-cn-zpr3156gn006-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-3-vpc.alikafka.aliyuncs.com:9092
  • 选填参数配置项:
    key.converter
    key.converter.schemas.enable
    value.converter
    value.converter.schemas.enable
    exactly.once.source.support
    heartbeat.interval.ms
    rebalance.timeout.ms
    session.timeout.ms
    client.dns.lookup
    connections.max.idle.ms
    connector.client.config.override.policy
    receive.buffer.bytes
    request.timeout.ms
    send.buffer.bytes
    worker.sync.timeout.ms
    worker.unsync.backoff.ms
    access.control.allow.methods
    access.control.allow.origin
    admin.listeners
    client.id
    config.providers
    connect.protocol
    header.converter
    metadata.max.age.ms
    offset.flush.interval.ms
    offset.flush.timeout.ms
    reconnect.backoff.max.ms
    reconnect.backoff.ms
    retry.backoff.ms
    scheduled.rebalance.max.delay.ms
    task.shutdown.graceful.timeout.ms
    topic.tracking.allow.reset
    topic.tracking.enable

不可自定义的Workers配置

以下配置项不支持自定义设置。

  • 使用云消息队列 Kafka 版提供默认值的配置项:
    plugin.path
    rest.advertised.port
    topic.creation.enable
    listeners
  • 不会传输至Kafka Connect的配置项。
    sasl.*
    ssl.*
    security.*
    rest.advertised.host.name
    rest.advertised.listener
    rest.extension.classes
    client.*
    inter.worker.*
    metrics.*
    metrics.context.*
    response.http.headers.config
    socket.*