本文说明如何创建FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数。
前提条件
在创建FC Sink Connector前,请确保您已完成以下操作:
- 为消息队列Kafka版实例开启Connector。更多信息,请参见开启Connector。
- 为消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
本文以名称为fc-test-input的Topic为例。
- 在函数计算创建函数。更多信息,请参见使用控制台创建函数。
本文以服务名称为guide-hello_world、函数名称为hello_world、运行环境为Python的事件函数为例。该示例函数的代码如下:
# -*- coding: utf-8 -*-
import logging
# To enable the initializer feature
# please implement the initializer function as below:
# def initializer(context):
# logger = logging.getLogger()
# logger.info('initializing')
def handler(event, context):
logger = logging.getLogger()
logger.info('hello world:' + bytes.decode(event))
return 'hello world:' + bytes.decode(event)
为FC Sink Connector开启公网访问
如需使FC Sink Connector跨地域访问其他阿里云服务,您需要为FC Sink Connector开启公网访问。具体操作,请参见为Connector开启公网访问。
创建自定义权限策略
在目标账号下创建访问函数计算的自定义权限策略。
- 登录访问控制控制台。
- 在左侧导航栏,选择。
- 在权限策略管理页面,单击创建权限策略。
- 在新建自定义权限策略页面,创建自定义权限策略。
- 在策略名称文本框,输入KafkaConnectorFcAccess。
- 在配置模式区域,选择脚本配置。
- 在策略内容区域,输入自定义权限策略脚本。
访问函数计算的自定义权限策略脚本示例如下:
{
"Version": "1",
"Statement": [
{
"Action": [
"fc:InvokeFunction",
"fc:GetFunction"
],
"Resource": "*",
"Effect": "Allow"
}
]
}
- 单击确定。
创建RAM角色
在目标账号下创建RAM角色。由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手工修改信任策略。
- 在左侧导航栏,单击RAM角色管理。
- 在RAM角色管理页面,单击创建RAM角色。
- 在创建RAM角色面板,创建RAM角色。
- 在当前可信实体类型区域,选择阿里云服务,单击下一步。
- 在角色类型区域,选择普通服务角色,在角色名称文本框,输入AliyunKafkaConnectorRole,从选择受信服务列表,选择函数计算,然后单击完成。
- 在RAM角色管理页面,找到AliyunKafkaConnectorRole,单击AliyunKafkaConnectorRole。
- 在AliyunKafkaConnectorRole页面,单击信任策略管理页签,单击修改信任策略。
- 在修改信任策略对话框,将脚本中fc替换为alikafka,单击确定。
添加权限
在目标账号下为创建的RAM角色授予访问函数计算的权限。
- 在左侧导航栏,单击RAM角色管理。
- 在RAM角色管理页面,找到AliyunKafkaConnectorRole,在其右侧操作列,单击添加权限。
- 在添加权限对话框,添加KafkaConnectorFcAccess权限。
- 在选择权限面板,选择自定义策略。
- 在权限策略名称列表,找到KafkaConnectorFcAccess,单击KafkaConnectorFcAccess。
- 单击确定。
- 单击完成。
创建FC Sink Connector依赖的Topic
您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的5个Topic。
- 登录消息队列Kafka版控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,单击Topic管理。
- 在Topic管理页面,选择实例,单击创建Topic。
- 在创建Topic对话框,设置Topic属性,然后单击创建。
Topic |
描述 |
任务位点Topic |
用于存储消费位点的Topic。
- Topic名称:建议以connect-offset开头。
- 分区数:Topic的分区数量必须大于1。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
|
任务配置Topic |
用于存储任务配置的Topic。
- Topic名称:建议以connect-config开头。
- 分区数:Topic的分区数量必须为1。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
|
任务状态Topic |
用于存储任务状态的Topic。
- Topic名称:建议以connect-status开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
|
死信队列Topic |
用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
- Topic名称:建议以connect-error开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎可以为Local存储或云存储。
|
异常数据Topic |
用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
- Topic名称:建议以connect-error开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎可以为Local存储或云存储。
|
创建FC Sink Connector依赖的Consumer Group
您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的2个Consumer Group。
- 在左侧导航栏,单击Consumer Group管理。
- 在Consumer Group管理页面,选择实例,单击创建Consumer Group。
- 在创建Consumer Group对话框,设置Topic属性,然后单击创建。
Consumer Group |
描述 |
Connector任务消费组 |
Connector的数据同步任务使用的Consumer Group。该Consumer Group的名称必须为connect-任务名称。
|
Connector消费组 |
Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。 |
创建并部署FC Sink Connector
创建并部署用于将数据从消息队列Kafka版同步至函数计算的FC Sink Connector。
- 登录消息队列Kafka版控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,单击Connector。
- 在Connector页面,选择实例,单击创建Connector。
- 在创建Connector面板,完成以下操作。
- 在基础信息下方的Connector名称文本框,输入Connector名称,从转储路径列表,选择消息队列Kafka版,从转储到列表,选择函数计算,单击下一步。
注意 消息队列Kafka版会为您自动选中
授权创建服务关联角色。
- 如果未创建服务关联角色,消息队列Kafka版会为您自动创建一个服务关联角色,以便您使用FC Sink Connector。
- 如果已创建服务关联角色,消息队列Kafka版不会重复创建。
关于该服务关联角色的更多信息,请参见
服务关联角色。
参数 |
描述 |
示例值 |
Connector名称 |
Connector的名称。命名规则:
- 可以包含数字、小写英文字母和短划线(-),但不能以短划线(-)开头,长度限制为48个字符。
- 同一个消息队列Kafka版实例内保持唯一。
Connector的数据同步任务必须使用名称为connect-任务名称的Consumer Group。如果您未手动创建该Consumer Group,系统将为您自动创建。
|
kafka-fc-sink |
任务类型 |
Connector的数据同步任务类型。本文以数据从消息队列Kafka版同步至函数计算为例。更多任务类型,请参见Connector类型。
|
KAFKA2FC |
- 在源实例配置下方的数据源Topic文本框,输入数据源Topic的名称,从消费初始位置列表,选择消费初始位置,在创建资源,选择自动创建或者选择手动创建并输入手动创建的Topic的名称,然后单击下一步。
参数 |
描述 |
示例值 |
VPC ID |
数据同步任务所在的VPC。默认为消息队列Kafka版实例所在的VPC,您无需填写。
|
vpc-bp1xpdnd3l*** |
VSwitch ID |
数据同步任务所在的交换机。该交换机必须与消息队列Kafka版实例处于同一VPC。默认为部署消息队列Kafka版实例时填写的交换机。
|
vsw-bp1d2jgg81*** |
数据源Topic |
需要同步数据的Topic。 |
fc-test-input |
消费初始位置 |
开始消费的位置。取值:
- latest:从最新位点开始消费。
- earliest:从最初位点开始消费。
|
latest |
消费线程并发数 |
数据源Topic的消费线程并发数。默认值为3。取值:
|
3 |
Connector消费组 |
Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。 |
connect-cluster-kafka-fc-sink |
任务位点Topic |
用于存储消费位点的Topic。
- Topic名称:建议以connect-offset开头。
- 分区数:Topic的分区数量必须大于1。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
|
connect-offset-kafka-fc-sink |
任务配置Topic |
用于存储任务配置的Topic。
- Topic名称:建议以connect-config开头。
- 分区数:Topic的分区数量必须为1。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
|
connect-config-kafka-fc-sink |
任务状态Topic |
用于存储任务状态的Topic。
- Topic名称:建议以connect-status开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
|
connect-status-kafka-fc-sink |
死信队列Topic |
用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
- Topic名称:建议以connect-error开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎可以为Local存储或云存储。
|
connect-error-kafka-fc-sink |
异常数据Topic |
用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
- Topic名称:建议以connect-error开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎可以为Local存储或云存储。
|
connect-error-kafka-fc-sink |
- 在目标实例配置下方,设置函数计算服务相关参数,设置发送模式和发送批大小,然后单击下一步。
参数 |
描述 |
示例值 |
是否跨账号/地域 |
FC Sink Connector是否跨账号/地域向函数计算服务同步数据。默认为否。取值:
- 否:同地域同账号模式。
- 是:跨地域同账号模式、同地域跨账号模式或跨地域跨账号模式。
|
否 |
服务地域 |
函数计算服务的地域。默认为FC Sink Connector所在地域。如需跨地域,您需要为Connector开启公网访问,然后选择目标地域。更多信息,请参见#d9e101。
|
cn-hangzhou |
服务接入点 |
函数计算服务的接入点。在函数计算控制台的概览页的常用信息区域获取。
- 内网Endpoint:低延迟,推荐。适用于消息队列Kafka版实例和函数计算处于同一地域场景。
- 公网Endpoint:高延迟,不推荐。适用于消息队列Kafka版实例和函数计算处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。更多信息,请参见#d9e101。
|
http://188***.cn-hangzhou.fc.aliyuncs.com |
服务账号 |
函数计算服务的阿里云账号ID。在函数计算控制台的概览页的常用信息区域获取。
|
188*** |
授权角色名 |
消息队列Kafka版访问函数计算服务的RAM角色。
|
AliyunKafkaConnectorRole |
服务名 |
函数计算服务的名称。 |
guide-hello_world |
函数名 |
函数计算服务的函数名称。 |
hello_world |
服务版本或别名 |
函数计算服务的版本或别名。 |
LATEST |
发送模式 |
消息发送模式。取值:
- 异步:推荐。
- 同步:不推荐。同步发送模式下,如果函数计算的处理时间较长,消息队列Kafka版的处理时间也会较长。当同一批次消息的处理时间超过5分钟时,会触发消息队列Kafka版客户端Rebalance。
|
异步 |
发送批大小 |
批量发送的消息条数。默认为20。Connector根据发送批次大小和请求大小限制(同步请求大小限制为6 MB,异步请求大小限制为128 KB)将多条消息聚合后发送。例如,发送模式为异步,发送批次大小为20,如果要发送18条消息,其中有17条消息的总大小为127
KB,有1条消息的大小为200 KB,Connector会将总大小不超过128 KB的17条消息聚合后发送,将大小超过128 KB的1条消息单独发送。
说明 如果您在发送消息时将key设置为null,则请求中不包含key。如果将value设置为null,则请求中不包含value。
- 如果批量发送的多条消息的大小不超过请求大小限制,则请求中包含消息内容。请求示例如下:
[
{
"key":"this is the message's key2",
"offset":8,
"overflowFlag":false,
"partition":4,
"timestamp":1603785325438,
"topic":"Test",
"value":"this is the message's value2",
"valueSize":28
},
{
"key":"this is the message's key9",
"offset":9,
"overflowFlag":false,
"partition":4,
"timestamp":1603785325440,
"topic":"Test",
"value":"this is the message's value9",
"valueSize":28
},
{
"key":"this is the message's key12",
"offset":10,
"overflowFlag":false,
"partition":4,
"timestamp":1603785325442,
"topic":"Test",
"value":"this is the message's value12",
"valueSize":29
},
{
"key":"this is the message's key38",
"offset":11,
"overflowFlag":false,
"partition":4,
"timestamp":1603785325464,
"topic":"Test",
"value":"this is the message's value38",
"valueSize":29
}
]
- 如果发送的单条消息的大小超过请求大小限制,则请求中不包含消息内容。请求示例如下:
[
{
"key":"123",
"offset":4,
"overflowFlag":true,
"partition":0,
"timestamp":1603779578478,
"topic":"Test",
"value":"1",
"valueSize":272687
}
]
说明 如需获取消息内容,您需要根据位点主动拉取消息。
|
50 |
重试次数 |
消息发送失败后的重试次数。默认为2。取值范围为1~3。部分导致消息发送失败的错误不支持重试。错误码与是否支持重试的对应关系如下:
- 4XX:除429支持重试外,其余错误码不支持重试。
- 5XX:支持重试。
说明
- 关于错误码的更多信息,请参见错误码。
- Connector调用InvokeFunction向函数计算发送消息。关于InvokeFunction的更多信息,请参见API 定义。
|
2 |
失败处理 |
消息发送失败后的错误处理。默认为log。取值:
- log:继续对出现错误的Topic的分区的订阅,并打印错误日志。出现错误后,您可以通过Connector日志查看错误,并根据错误的错误码查找解决方案,以进行自助排查。
- fail:停止对出现错误的Topic的分区的订阅,并打印错误日志。出现错误后,您可以通过Connector日志查看错误,并根据错误的错误码查找解决方案,以进行自助排查。
说明
- 如何查看Connector日志,请参见查看Connector日志。
- 如何根据错误码查找解决方案,请参见错误码。
- 如需恢复对出现错误的Topic的分区的订阅,您需要提交提交工单联系消息队列Kafka版技术人员。
|
log |
- 在预览/提交下方,确认Connector的配置,然后单击提交。
- 在创建Connector面板,单击部署。
发送测试消息
部署FC Sink Connector后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否被同步至函数计算。
- 在Connector页面,找到目标Connector,在其右侧操作列,单击测试。
- 在Topic管理页面,选择实例,找到fc-test-input,在其右侧操作列,选择
> 发送消息。
- 在发送消息面板,发送测试消息。
- 在分区文本框,输入0。
- 在Message Key文本框,输入1。
- 在Message Value文本框,输入1。
- 单击发送。
查看函数日志
向消息队列Kafka版的数据源Topic发送消息后,查看函数日志,验证是否收到消息。更多信息,请参见配置并查看函数日志。
日志中显示发送的测试消息。
在文档使用中是否遇到以下问题
更多建议
匿名提交