本文介绍如何创建RabbitMQ Connector,将云消息队列 RabbitMQ 版的数据同步至云消息队列 Kafka 版。
前提条件
开通对象存储OSS服务并创建存储空间(Bucket)。更多信息,请参见控制台创建存储空间。
开通Serverless应用引擎服务。更多信息,请参见准备工作。
购买并部署云消息队列 Kafka 版实例。更多信息,请参见购买和部署实例。
步骤一:创建RabbitMQ资源
登录云消息队列 RabbitMQ 版控制台,创建RabbitMQ实例。操作步骤,请参见创建实例。
单击已创建的实例,在实例详情页面创建以下资源。
步骤二:创建Connector
下载RabbitMQ Connector文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择
。在任务列表页面,单击创建任务。
在创建任务面板。设置任务名称,配置以下配置项。
任务创建
在Source(源)配置向导,选择数据提供方为Apache Kafka Connect,单击下一步。
在连接器配置配置向导,设置以下配置项,然后单击下一步。
配置项
参数
说明
Kafka Connect插件
Bucket存储桶
选择OSS Bucket。
文件
选择上传的.ZIP文件。
Kafka资源信息
Kafka 参数配置
选择Source Connect。
Kafka实例
选择前提条件中创建的实例。
专有网络VPC
默认选择部署Kafka实例时选择的VPC ID且不可更改。
交换机
默认选择部署Kafka实例时选择的vSwitch ID且不可更改。
安全组
选择安全组。
Kafka Connect配置信息
解析当前ZIP包下的properties文件
选择新建properties文件。选择.ZIP文件中包含的SourceConnector对应的.properties文件。路径为/etc/source-xxx.properties。在输入框中更新下列字段的取值。
connector.class:运行的Connector的包名称,无需修改。
tasks.max:Task的最大数量。
rabbitmq.host:填写RabbitMQ实例VPC接入点地址。可在RabbitMQ实例详情页面的接入点信息区域查看。
rabbitmq.username:填写步骤一:创建RabbitMQ资源中创建的RabbitMQ实例静态用户名。
rabbitmq.password:填写步骤一:创建RabbitMQ资源中创建的RabbitMQ实例静态用户名密码。
rabbitmq.virtual.host:填写步骤一:创建RabbitMQ资源中创建的Vhost。
kafka.topic:目标Kafka Topic,请在投递数据前,提前创建好目标Topic。
rabbitmq.queue:填写步骤一:创建RabbitMQ资源中创建的Queue。
示例代码如下:
connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector name=rabbitmq-source-connector # RabbitMQ实例VPC接入点信息。 rabbitmq.host=xxx # RabbitMQ实例静态用户名密码。 rabbitmq.password=xxx # RabbitMQ实例静态用户名。 rabbitmq.username=xxx # RabbitMQ实例Vhost。 rabbitmq.virtual.host=xxx # 目标Kafka Topic。 kafka.topic=xxx # RabbitMQ实例队列。 rabbitmq.queue=xxx tasks.max=4
在实例配置配置向导,设置以下参数,然后单击下一步。
配置项
参数
说明
Worker规格
Worker规格
选择合适的Worker规格。
最小Worker数
设置最小Worker数量。
最大Worker数
设置最大Worker数量。此数值不得超过Task的最大数量。
横向扩缩容阈值 %
当利用率大于或小于设置的CPU和Memory数值时,触发自动扩容或缩容。仅当最小Worker数和最大Worker数值不相等时,需要配置此参数。
Kafka Connect Worker 配置
自动创建Kafka Connect Worker依赖资源
建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:
Offset Topic:用于存储源数据偏移量,命名规则为
connect-eb-offset-<任务名称>
。Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为
connect-eb-config-<任务名称>
。Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为
connect-eb-status-<任务名称>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为
connect-eb-cluster-<任务名称>
。Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Topic中的数据,命名规则为
connector-eb-cluster-<任务名称>-<connector名称>
。
在运行配置区域,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置,然后单击保存。
重要建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。
任务属性
设置此任务的重试策略及死信队列。更多信息,请参见重试和死信。
等待任务状态变为运行中,此时Connector已经在正常工作中。
步骤三:测试Connector
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击Queue列表,然后单击目标Queue右侧操作列的详情。
在Queue详情页面,单击被绑定信息页签的添加被绑定。
在添加被绑定面板,选择源Exchange为amq.direct,单击确定。
在被绑定信息页签,单击amq.direct Exchange右侧操作列的发送消息,向Kafka的目标Topic发送消息。更多信息,请参见发送消息。
登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据。
常见报错
场景一:所有Tasks运行失败
错误信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。
场景二:Kafka Connect退出
错误信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。
在消息流入任务详情页面的Worker信息区域,单击SAE应用后的实例名称,跳转到SAE应用详情页面。
在基本信息页面,单击实例部署信息页签。
在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。
执行
vi /home/admin/connector-bootstrap.log
命令,查看Connector启动日志,查找其中是否包含错误信息。执行
vi /opt/kafka/logs/connect.log
命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。
基于错误信息提示进行修复操作后,可以重新启动对应任务。
场景三:Connector参数校验失败
错误信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}