文档

Connector安装及使用

更新时间:
一键部署

本文向您介绍如何使用流数据服务Confluent的Connector相关功能。

说明
  • Connectors自定义安装功能仅支持阿里云流数据服务Confluent 7.x及以上版本。

  • 阿里云流数据服务6.x版本安装Connectors请联系我们进行后台安装或升级至7.x及以上版本。

  • 目前标准版不支持Connectors自定义安装,专业版支持OpenSourceConnectors安装,企业版支持OpenSourceConnectors,Commercial Connectors和Premium Connectors的安装,详情请参见Connector分类

Connector安装过程

  1. 登录集群管控页,在集群详情页中的“Connectors”菜单栏下选择添加Connector。

  1. 查询要安装的connector名称,勾选并确认安装重启connector。

  1. 安装成功后登录Control Center即可看到部署完成的connector。

Connector使用案例

案例一、redis-enterprise-kafka

connector配置

  1. 配置方式分为两种:在Control Center中自行配置,或者上传connector config file。

  • Control Center配置

  • connector config file配置

{
 "name": "RedisEnterpriseSinkConnectorConnector_0",
 "config": {
 "name": "RedisEnterpriseSinkConnectorConnector_0",
 "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.storage.StringConverter",
 "topics": "redis-connect-topic",
 "redis.uri": "redis://r-xxxxxxxxxxxx.redis.rds.aliyuncs.com:6379",
 "redis.type": "STRING",
 "principal.service.name": "****",
 "principal.service.password": "************"
 }
  1. 如果配置成功无误,Control Center中会显示处于running状态的connector实例。

connector功能测试

  1. 通过client或者SDK方式向配置的confluent topic中发送数据。

  1. connector会自动启动job,可以看到redis-connect-topic中的数据已写入配置的Redis实例中。

案例二、kafka-connect-s3

kafka-connect-s3可以兼容OSS协议,可以通过S3SinkConnector相关配置选项建立connector,将kafka集群中Topic数据导出至OSS的bucket中。

connector配置

  1. 配置文件内容如下所示,该配置将集群中Topic名为test101下的消息导出到OSS的cn-beijing区中bucket为lm-xxxx-test的topic_test目录下。

{
 "name": "oss_test",
 "config": {
 "name": "oss_test",
 "connector.class": "io.confluent.connect.s3.S3SinkConnector",
 "tasks.max": "1",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.storage.StringConverter",
 "topics": "test101",
 "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
 "flush.size": "1",
 "schema.compatibility": "NONE",
 //bucketName必须是OSS上某个bucket下面的一个目录
 "s3.bucket.name": "topic_test",
 "s3.region": "cn-beijing",
 //可以写入OSS文件的账户AK
 "aws.access.key.id": "your_access_key_id",
 "aws.secret.access.key": "******************************",
 "storage.class": "io.confluent.connect.s3.storage.S3Storage",
 "topics.dir": "",
 //此处包含bucket相关信息
 "store.url": "https://lm-xxxx-test.oss-cn-beijing-internal.aliyuncs.com",
 "principal.service.name": "****",
 "principal.service.password": "********"
 }

案例三、 debezium-connector-mysql

connector配置

  1. 配置文件内容如下所示,配置项目需要添加broker认证SASL_SSL相关配置,详情请参官方配置文档

{
 "name": "MySqlConnectorConnector_0",
 "config": {
 "connector.class": "io.debezium.connector.mysql.MySqlConnector",
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "database.hostname": "rm-2zee1q3dxz1t628dp.mysql.rds.aliyuncs.com",
 "database.user": "db_user",
 "database.password": "db_password",
 "database.server.name": "db_name",
 "database.history.kafka.bootstrap.servers": "rb-a9702122dc8fe248-internal.csp.aliyuncs.com:9095",
 "database.history.kafka.topic": "test1",
 "database.include.list": "",
 "principal.service.name": "principal_user",
 "principal.service.password": "principal_user_password",
 //以下为自定义项目,需要手动添加,否则连接不上broker
 "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_user' password='your_password';",
 "database.history.producer.sasl.mechanism": "PLAIN",
 "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_user' password='your_password';",
 "database.history.producer.security.protocol": "SASL_SSL",
 "database.history.consumer.security.protocol": "SASL_SSL",
 "database.history.consumer.sasl.mechanism": "PLAIN"
 }

connector常见问题

  1. 配置项目中要求MySQL的用户拥有RELOAD或者FLUSH_TABLES权限,否则出现如下错误。

  2. image
  3. connector把MySQL数据库表数据发送至Topic过程中需要自动创建Topic,此时需要在Control Center中设置auto.create.topics.enable配置项为true,或者手动创建相应名字的Topic,否则报错如下。

  4. imageimage

Connector Rest API

重要

强制更新connector可能导致正在运行的job结束,且会导致Control Center中connect信息异常,可通过Restful API手动删除connector进行解决,更多API使用详情请参见API文档

1. 查看connector实例信息

(1)API格式

GET /connectors

(2)查询方式

可以通过Postman或者命令行的方式进行查看。

  • Postman

image
  • 命令行

curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'

(3)使用场景

用于查看已经部署的connector信息详情。

2. 查看connector状态信息

(1)API格式

GET /connectors/(string:name)/tasks/(int:taskid)/status

(2)查询方式

可以通过Postman或者命令行的方式进行查看。

  • Postman

image
  • 命令行

curl --location --request GET 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'

(3)使用场景

用于查询connector的部署状态,当connector部署failed后,可通过该API进行详情查询,诊断问题。

3. 删除connector

(1)API格式

DELETE /connectors/(string:name)/

(2)查询方式

可以通过Postman或者命令行的方式进行查看。

  • Postman

image
  • 命令行

curl --location --request DELETE 'https://connect-xxxxxxxxxx-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector'  --header 'Authorization: Basic xxx'

(3)使用场景

用于强制更新部署connector后,Control Center中出现获取connector信息失败时,可通过该API对connector进行强制删除,Control Center可恢复正常运行。

  • 本页导读 (0)
文档反馈