本文说明如何创建FC Sink Connector,您可以通过FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数。
前提条件
- 消息队列Kafka版
- 为消息队列Kafka版实例开启Connector。更多信息,请参见开启Connector。
- 为消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
本文以名称为fc-test-input的Topic为例。
- 函数计算
- 在函数计算创建函数。更多信息,请参见使用控制台创建函数。
重要 函数类型必须为事件函数。
本文以服务名称为guide-hello_world、函数名称为hello_world、运行环境为Python的事件函数为例。该示例函数的代码如下:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
- 在函数计算创建函数。更多信息,请参见使用控制台创建函数。
- 可选:事件总线EventBridge
说明 仅在您创建的Connector任务所属实例的地域为华东1(杭州)或西南1(成都)时,需要完成该操作。
注意事项
- 仅支持在同地域内,将数据从消息队列Kafka版实例的数据源Topic导出至函数计算。Connector的限制说明,请参见使用限制。
- 如果Connector所属实例的地域为华东1(杭州)或西南1(成都),该功能会部署至事件总线EventBridge。
- 事件总线EventBridge目前免费供您使用。更多信息,请参见计费说明。
- 创建Connector时,事件总线EventBridge会为您自动创建服务关联角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC。
- 如果未创建服务关联角色,事件总线EventBridge会为您自动创建对应的服务关联角色,以便允许事件总线EventBridge使用此角色访问消息队列Kafka版和专有网络VPC。
- 如果已创建服务关联角色,事件总线EventBridge不会重复创建。
- 部署到事件总线EventBridge的任务暂时不支持查看任务运行日志。Connector任务执行完成后,您可以在订阅数据源Topic的Group中,通过消费情况查看任务进度。具体操作,请参见查看消费状态。
操作流程
使用FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数的操作流程如下:
- 可选:使FC Sink Connector跨地域访问函数计算
重要 如果您不需要使FC Sink Connector跨地域访问函数计算,您可以直接跳过该步骤。
- 可选:使FC Sink Connector跨账号访问函数计算
重要 如果您不需要使FC Sink Connector跨账号访问函数计算,您可以直接跳过该步骤。
- 可选:创建FC Sink Connector依赖的Topic和Group重要
- 如果您不需要自定义Topic和Group的名称,您可以直接跳过该步骤。
- 部分FC Sink Connector依赖的Topic的存储引擎必须为Local存储,大版本为0.10.2版本的消息队列Kafka版实例不支持手动创建Local存储的Topic,只支持自动创建。
- 创建并部署FC Sink Connector
- 结果验证
为FC Sink Connector开启公网访问
如需使FC Sink Connector跨地域访问其他阿里云服务,您需要为FC Sink Connector开启公网访问。具体操作,请参见为Connector开启公网访问。
创建自定义权限策略
在目标账号下创建访问函数计算的自定义权限策略。
- 登录访问控制控制台。
- 在左侧导航栏,选择 。
- 在权限策略页面,单击创建权限策略。
- 在新建自定义权限策略页面,创建自定义权限策略。
创建RAM角色
在目标账号下创建RAM角色。由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手动修改信任策略。
添加权限
在目标账号下为创建的RAM角色授予访问函数计算的权限。
- 在左侧导航栏,选择 。
- 在角色页面,找到AliyunKafkaConnectorRole,在其右侧操作列,单击添加权限。
- 在添加权限面板,添加KafkaConnectorFcAccess权限。
- 在选择权限区域,选择自定义策略。
- 在权限策略名称列表,找到KafkaConnectorFcAccess,单击KafkaConnectorFcAccess。
- 单击确定。
- 单击完成。
创建FC Sink Connector依赖的Topic
您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的5个Topic,包括:任务位点Topic、任务配置Topic、任务状态Topic、死信队列Topic以及异常数据Topic。每个Topic所需要满足的分区数与存储引擎会有差异,具体信息,请参见配置源服务参数列表。
创建FC Sink Connector依赖的Group
您可以在消息队列Kafka版控制台手动创建FC Sink Connector数据同步任务使用的Group。该Group的名称必须为connect-任务名称,具体信息,请参见配置源服务参数列表。
创建并部署FC Sink Connector
创建并部署用于将数据从消息队列Kafka版同步至函数计算的FC Sink Connector。
发送测试消息
部署FC Sink Connector后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否被同步至函数计算。
- 在Connector 任务列表页面,找到目标Connector,在其右侧操作列,单击测试。
- 在发送消息面板,发送测试消息。
- 发送方式选择控制台。
- 在消息 Key文本框中输入消息的Key值,例如demo。
- 在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
- 设置发送到指定分区,选择是否指定分区。
- 单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
- 单击否,不指定分区。
- 发送方式选择Docker,执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
- 发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK发送消息。
- 发送方式选择控制台。
查看函数日志
向消息队列Kafka版的数据源Topic发送消息后,查看函数日志,验证是否收到消息。更多信息,请参见配置日志。
日志中显示发送的测试消息。
