本文说明如何创建FC Sink Connector将数据从消息队列Kafka版导出至函数计算。

前提条件

在创建FC Sink Connector前,请确保您已完成以下操作:
  1. 根据网络类型购买并部署消息队列Kafka版实例。详情请参见VPC接入公网+VPC接入
  2. 消息队列Kafka版实例开启Connector。详情请参见开启Connector
  3. 在函数计算服务创建服务名称为guide-hello_world、函数名称为hello_world、运行环境为python的事件函数。创建完成后,将函数代码改成如下示例:
    # -*- coding: utf-8 -*-
    import logging
    
    # To enable the initializer feature
    # please implement the initializer function as below:
    # def initializer(context):
    #   logger = logging.getLogger()
    #   logger.info('initializing')
    
    def handler(event, context):
      logger = logging.getLogger()
      logger.info('hello world:' + bytes.decode(event))
      return 'hello world:' + bytes.decode(event)
    详情请参见使用控制台创建函数

操作流程

使用FC Sink Connector将数据从消息队列Kafka版的Topic导出至函数计算的操作流程如下:

  1. 授予消息队列Kafka版访问函数计算的权限
    1. 创建自定义权限策略
    2. 创建RAM角色
    3. 添加权限
  2. 创建FC Sink Connector所需的消息队列Kafka版资源
    1. 创建Consumer Group
    2. 创建Topic
  3. 创建FC Sink Connector

    创建FC Sink Connector

  4. 结果验证
    1. 发送消息
    2. 查看函数日志

创建自定义权限策略

创建访问函数计算的自定义权限策略。

  1. 登录访问控制控制台
  2. 在左侧导航栏,选择权限管理 > 权限策略管理
  3. 权限策略管理页面,单击创建权限策略
  4. 新建自定义权限策略页面,创建自定义权限策略。
    1. 策略名称文本框,输入KafkaConnectorFcAccess
    2. 配置模式区域,选择脚本配置
    3. 策略内容区域,输入自定义权限策略脚本。
      访问函数计算的自定义权限策略脚本示例如下:
      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "fc:InvokeFunction",
                      "fc:GetFunction"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
    4. 单击确定

创建RAM角色

由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手工修改信任策略。

  1. 在左侧导航栏,单击RAM角色管理
  2. RAM角色管理页面,单击创建RAM角色
  3. 创建RAM角色向导页面,创建RAM角色。
    1. 当前可信实体类型区域,选择阿里云服务,单击下一步
    2. 角色类型区域,选择普通服务角色,在角色名称文本框,输入AliyunKafkaConnectorRole,从选择受信服务列表,选择函数计算,然后单击完成
  4. RAM角色管理页面,找到AliyunKafkaConnectorRole,单击AliyunKafkaConnectorRole
  5. AliyunKafkaConnectorRole页面,单击信任策略管理页签,单击修改信任策略
  6. 修改信任策略对话框,将脚本中fc替换为alikafka,单击确定
    AliyunKafkaConnectorRole

添加权限

为创建的RAM角色授予访问函数计算的权限。

  1. 在左侧导航栏,单击RAM角色管理
  2. RAM角色管理页面,找到AliyunKafkaConnectorRole,在其右侧操作列,单击添加权限
  3. 添加权限对话框,添加KafkaConnectorFcAccess权限。
    1. 选择权限区域,选择自定义权限策略
    2. 权限策略名称列表,找到KafkaConnectorFcAccess,单击KafkaConnectorFcAccess
    3. 单击确定
    4. 单击完成

创建Consumer Group

授予消息队列Kafka版的RAM角色访问函数计算的权限后,创建FC Sink Connector数据同步任务所需的Consumer Group。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击Consumer Group管理
  4. Consumer Group管理页面,选择实例,单击创建Consumer Group
  5. 创建Consumer Group对话框,创建任务消费组。
    1. Consumer Group文本框,输入connect-kafka-fc-sink
    2. 备注文本框,输入connect-kafka-fc-sink
    3. 单击创建
    cg
  6. 重复步骤4和步骤5,创建Connector消费组。
    表 1. Consumer Group参数说明
    Consumer Group类型 参数 描述 示例值
    Connector消费组 Consumer Group Consumer Group的名称。建议以connect-cluster开头。 connect-cluster-kafka-fc-sink
    Consumer Group管理页面显示创建的Consumer Group。connect-cluster-kafka-fc-sink-result

创建Topic

创建FC Sink Connector数据同步任务所需的Topic。

  1. 在左侧导航栏,单击Topic管理
  2. Topic管理页面,单击创建Topic
  3. 创建Topic对话框,创建数据源Topic。
    1. Topic文本框,输入fc-test-input
    2. 备注文本框,输入fc-test-input
    3. 单击创建
    input topic
  4. 重复步骤2和步骤3,创建以下Topic:
    Topic类型 参数 描述 示例值
    任务位点Topic Topic Topic的名称。建议以connect-offset开头。 connect-offset-fc-sink
    分区数 Topic的分区数量。分区数量必须大于1。 12
    存储引擎 Topic的存储引擎。存储引擎必须为Local存储。 Local存储
    cleanup.policy Topic的日志清理策略。日志清理策略必须为compact。 compact
    任务配置Topic Topic Topic的名称。建议以connect-config开头。 connect-config-fc-sink
    分区数 Topic的分区数量。分区数量必须为1。 1
    存储引擎 Topic的存储引擎。存储引擎必须为Local存储。 Local存储
    cleanup.policy Topic的日志清理策略。日志清理策略必须为compact。 compact
    任务状态Topic Topic Topic的名称。建议以connect-status开头。 connect-status-fc-sink
    分区数 Topic的分区数量。分区数量建议为6。 6
    存储引擎 Topic的存储引擎。存储引擎必须为Local存储。 Local存储
    cleanup.policy Topic的日志清理策略。日志清理策略必须为compact。 compact
    死信队列Topic Topic Topic的名称。 fc_dead_letter_error
    分区数 Topic的分区数量。分区数量建议为6。 6
    存储引擎 Topic的存储引擎。存储引擎可以为Local存储或云存储。 云存储
    异常数据Topic Topic Topic的名称。 fc_runtime_error
    分区数 Topic的分区数量。分区数量建议为6。 6
    存储引擎 Topic的存储引擎。存储引擎可以为Local存储或云存储。 云存储
    说明 用于存储异常数据的死信队列Topic和异常数据Topic可以使用同一个Topic。
    Topic管理页面显示创建的Topic。fc_topic_result

创建FC Sink Connector

创建FC Sink Connector数据同步任务所需的Consumer Group和Topic后,创建FC Sink Connector将数据从消息队列Kafka版的Topic导出至函数计算。

  1. 在左侧导航栏,单击Connector
  2. Connector页面,单击创建Connector
  3. 创建Connector页面,填写Connector信息,然后单击预检查并创建
    信息类型 参数 描述 示例值
    任务信息 任务名称 数据同步任务的名称。实例内保持唯一。Connector的同步任务会使用名称为connect-任务名称的Consumer Group,因此您需要在创建Connector前,创建名称为connect-任务名称的Consumer Group的格式。 kafka-fc-sink
    通用信息 任务类型 数据同步任务的类型。消息队列Kafka版支持的任务类型,请参见Connector类型 KAFKA2FC
    用户VPC 数据同步任务所在的VPC。默认为消息队列Kafka版实例所在的VPC,您无需填写。 vpc-bp1xpdnd3l***
    用户交换机 数据同步任务所在的交换机。用户交换机必须与消息队列Kafka版实例处于同一VPC。默认为部署消息队列Kafka版实例时填写的交换机。 vsw-bp1d2jgg81***
    源实例信息 数据源Topic 需要同步数据的Topic。 fc-test-input
    消费初始位置 开始消费的位置。取值:
    • latest:从最新位点开始消费。
    • earliest:从最初位点开始消费。
    latest
    Connector消费组 用于同步数据的Consumer Group。 connect-cluster-kafka-fc-sink
    任务位点Topic 用于存储消费位点的Topic。 connect-offset-fc-sink
    任务配置Topic 用于存储任务配置的Topic。 connect-config-fc-sink
    任务状态Topic 用于存储任务状态的Topic。 connect-status-fc-sink
    死信队列Topic 用于存储Connect框架的异常数据的Topic。 fc_dead_letter_error
    异常数据Topic 用于存储Sink的异常数据的Topic。 fc_runtime_error
    目标实例信息 服务地域 函数计算服务的地域。 cn-hangzhou
    服务接入点 函数计算服务的接入点。在函数计算控制台的概览页的常用信息区域获取。
    • 内网Endpoint:低延迟,推荐。适用于消息队列Kafka版实例和函数计算处于同一地域场景。
    • 公网Endpoint:高延迟,不推荐。适用于消息队列Kafka版实例和函数计算处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。详情请参见为Connector开启公网访问
    http://188***.cn-hangzhou.fc.aliyuncs.com
    服务账号 函数计算服务的主账号ID。在函数计算控制台的概览页的常用信息区域获取。 188***
    授权角色名 消息队列Kafka版的RAM角色的名称。详情请参见创建RAM角色 AliyunKafkaConnectorRole
    服务名 函数计算服务的名称。 guide-hello_world
    服务方法名 函数计算服务的函数名称。 hello_world
    服务版本 函数计算服务的版本。 LATEST
    发送模式 数据发送模式。取值:
    • 异步:推荐。
    • 同步:不推荐。同步发送模式下,如果函数计算的处理时间较长,消息队列Kafka版的处理时间也会较长。当同一批次数据的处理时间超过5分钟时,会触发消息队列Kafka版客户端Rebalance。
    异步
    发送批次大小 批量发送消息的大小。默认为20。同步任务会根据该值结合同步异步请求的大小的限制(同步请求6 M,异步128 K)将数据合并发送,如果单条数据大小超过请求大小上限,数据内容将不会包含在请求中,您可以通过offset主动拉取消息队列Kafka版数据。 50
    Connector页面显示创建的FC Sink Connector。

发送消息

创建FC Sink Connector将消息队列Kafka版与函数计算连接后,向消息队列Kafka版的数据源Topic发送测试消息。

  1. 在左侧导航栏,单击Topic管理
  2. Topic管理页面,找到fc-test-input,在其右侧操作列,单击发送消息
  3. 发送消息对话框,发送测试消息。
    1. 分区文本框,输入0
    2. Message Key文本框,输入1
    3. Message Value文本框,输入1
    4. 单击发送

查看函数日志

消息队列Kafka版的数据源Topic发送消息后,查看函数日志,验证是否收到消息。详情请参见配置并查看函数日志

日志中显示发送的测试消息。

fc LOG

后续步骤