文档

Connector使用案例

更新时间:

本文向您介绍如何使用云消息队列 Confluent 版的Connector相关功能。

redis-enterprise-kafka

Connector配置

配置方式分为两种,在Control Center中自行配置,或者上传connector config file。

  • Control Center配置image

  • connector config file配置

    {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "config": {
     "name": "RedisEnterpriseSinkConnectorConnector_0",
     "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "redis-connect-topic",
     "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379",
     "redis.type": "STRING",
     "principal.service.name": "****",
     "principal.service.password": "************"
     }
  1. 如果配置成功无误,Control Center中会显示处于running状态的connector实例。imageimage

connector功能测试

  1. 通过Client或者SDK方式向配置的Confluent Topic中发送数据。image

  2. connector会自动启动job,可以看到redis-connect-topic中的数据已写入配置的Redis实例中。image

kafka-connect-s3

kafka-connect-s3兼容OSS协议,可以通过S3SinkConnector相关配置选项建立Connector,将Kafka集群中Topic数据导出至OSS的bucket中。

connector配置

  1. 配置文件内容如下所示,该配置将集群中Topic名为test101下的消息导出到OSS的cn-beijing区中bucket为lm-xxxx-test的topic_test目录下。

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     //bucketName必须是OSS上某个bucket下面的一个目录
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     //可以写入OSS文件的账户AK
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     //此处包含bucket相关信息
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

debezium-connector-mysql

connector配置

  1. 配置文件内容如下所示,配置项目需要添加broker认证SASL_SSL相关配置,详情请参见官方配置文档

    {
     "name": "oss_test",
     "config": {
     "name": "oss_test",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "topics": "test101",
     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
     "flush.size": "1",
     "schema.compatibility": "NONE",
     //bucketName必须是OSS上某个bucket下面的一个目录
     "s3.bucket.name": "topic_test",
     "s3.region": "cn-beijing",
     //可以写入OSS文件的账户AK
     "aws.access.key.id": "your_access_key_id",
     "aws.secret.access.key": "******************************",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
     "topics.dir": "",
     //此处包含bucket相关信息
     "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
     "principal.service.name": "****",
     "principal.service.password": "********"
     }

Connector常见问题

  1. 配置项目中要求MySQL的用户拥有RELOAD或FLUSH_TABLES权限,否则会出现如下错误。image

  2. connector把MySQL数据库表数据发送至Topic过程中需要自动创建Topic,此时需要在Control Center中设置auto.create.topics.enable配置项为true,或者手动创建相应名字的Topic,否则报错如下。imageimage

Connector Rest API

重要

强制更新Connector可能导致正在运行的job结束,且会导致Control Center中Connect信息异常,可通过Restful API手动删除Connector进行解决,更多API使用详情,请参见API文档

查看Connector实例信息

  • API格式:GET /connectors

  • 查询方式:可以通过Postman或者命令行的方式进行查看。

    • Postmanimage

    • 命令行

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'
  • 使用场景:用于查看已经部署的connector信息详情。

查看Connector状态信息

  • API格式:GET /connectors/(string:name)/tasks/(int:taskid)/status

  • 查询方式:可以通过Postman或者命令行的方式进行查看。

    • Postmanimage

    • 命令行

      curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'
  • 使用场景:用于查询connector的部署状态,当connector部署failed后,可通过该API进行详情查询,诊断问题。

删除connector

  • API格式:DELETE /connectors/(string:name)/

  • 查询方式:可以通过Postman或者命令行的方式进行查看。

    • Postmanimage

    • 命令行

      curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector'  --header 'Authorization: Basic xxx'
  • 使用场景:用于强制更新部署Connector后,Control Center中出现获取Connector信息失败时,可通过该API对connector进行强制删除,Control Center可恢复正常运行。