本文说明如何创建FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数。

前提条件

在创建FC Sink Connector前,请确保您已完成以下操作:
  1. 消息队列Kafka版实例开启Connector。更多信息,请参见开启Connector
  2. 消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic

    本文以名称为fc-test-input的Topic为例。

  3. 在函数计算创建函数。更多信息,请参见使用控制台创建函数

    本文以服务名称为guide-hello_world、函数名称为hello_world、运行环境为Python的事件函数为例。该示例函数的代码如下:

    # -*- coding: utf-8 -*-
    import logging
    
    # To enable the initializer feature
    # please implement the initializer function as below:
    # def initializer(context):
    #   logger = logging.getLogger()
    #   logger.info('initializing')
    
    def handler(event, context):
      logger = logging.getLogger()
      logger.info('hello world:' + bytes.decode(event))
      return 'hello world:' + bytes.decode(event)

操作流程

使用FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数的操作流程如下:

  1. 授予消息队列Kafka版访问函数计算的权限
    1. 创建自定义权限策略
    2. 创建RAM角色
    3. 添加权限
  2. 创建FC Sink Connector依赖的Topic和Consumer Group

    如果您不需要自定义Topic和Consumer Group的名称,您可以直接跳过该步骤,在下一步骤选择自动创建。

    注意 部分FC Sink Connector依赖的Topic的存储引擎必须为Local存储,大版本为0.10.2的消息队列Kafka版实例不支持手动创建Local存储的Topic,只支持自动创建。
    1. 可选:创建FC Sink Connector依赖的Topic
    2. 可选:创建FC Sink Connector依赖的Consumer Group
  3. 创建并部署FC Sink Connector
    1. 创建FC Sink Connector
    2. 部署FC Sink Connector
  4. 结果验证
    1. 发送消息
    2. 查看函数日志

创建自定义权限策略

创建访问函数计算的自定义权限策略。

  1. 登录访问控制控制台
  2. 在左侧导航栏,选择权限管理 > 权限策略管理
  3. 权限策略管理页面,单击创建权限策略
  4. 新建自定义权限策略页面,创建自定义权限策略。
    1. 策略名称文本框,输入KafkaConnectorFcAccess
    2. 配置模式区域,选择脚本配置
    3. 策略内容区域,输入自定义权限策略脚本。
      访问函数计算的自定义权限策略脚本示例如下:
      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "fc:InvokeFunction",
                      "fc:GetFunction"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
    4. 单击确定

创建RAM角色

由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手工修改信任策略。

  1. 在左侧导航栏,单击RAM角色管理
  2. RAM角色管理页面,单击创建RAM角色
  3. 创建RAM角色面板,创建RAM角色。
    1. 当前可信实体类型区域,选择阿里云服务,单击下一步
    2. 角色类型区域,选择普通服务角色,在角色名称文本框,输入AliyunKafkaConnectorRole,从选择受信服务列表,选择函数计算,然后单击完成
  4. RAM角色管理页面,找到AliyunKafkaConnectorRole,单击AliyunKafkaConnectorRole
  5. AliyunKafkaConnectorRole页面,单击信任策略管理页签,单击修改信任策略
  6. 修改信任策略对话框,将脚本中fc替换为alikafka,单击确定
    AliyunKafkaConnectorRole

添加权限

为创建的RAM角色授予访问函数计算的权限。

  1. 在左侧导航栏,单击RAM角色管理
  2. RAM角色管理页面,找到AliyunKafkaConnectorRole,在其右侧操作列,单击添加权限
  3. 添加权限对话框,添加KafkaConnectorFcAccess权限。
    1. 选择权限面板,选择自定义权限策略
    2. 权限策略名称列表,找到KafkaConnectorFcAccess,单击KafkaConnectorFcAccess
    3. 单击确定
    4. 单击完成

创建FC Sink Connector依赖的Topic

您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的5个Topic。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击Topic管理
  4. Topic管理页面,选择实例,单击创建Topic
  5. 创建Topic对话框,设置Topic属性,然后单击创建
    Topic 描述
    任务位点Topic 用于存储消费位点的Topic。
    • Topic名称:建议以connect-offset开头。
    • 分区数:Topic的分区数量必须大于1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    任务配置Topic 用于存储任务配置的Topic。
    • Topic名称:建议以connect-config开头。
    • 分区数:Topic的分区数量必须为1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    任务状态Topic 用于存储任务状态的Topic。
    • Topic名称:建议以connect-status开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    死信队列Topic 用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
    • Topic名称:建议以connect-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
    异常数据Topic 用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
    • Topic名称:建议以connect-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。

创建FC Sink Connector依赖的Consumer Group

您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的2个Consumer Group。

  1. 在左侧导航栏,单击Consumer Group管理
  2. Consumer Group管理页面,选择实例,单击创建Consumer Group
  3. 创建Consumer Group对话框,设置Topic属性,然后单击创建
    Consumer Group 描述
    Connector任务消费组 Connector的数据同步任务使用的Consumer Group。该Consumer Group的名称必须为connect-任务名称
    Connector消费组 Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。

创建FC Sink Connector

授予消息队列Kafka版访问函数计算的权限后,创建用于将数据从消息队列Kafka版同步至函数计算的FC Sink Connector。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击Connector
  4. Connector页面,选择实例,单击创建Connector
  5. 创建Connector面板,完成以下操作。
    1. 基础信息下方的Connector名称文本框,输入Connector名称,从转储路径列表,选择消息队列Kafka版,从转储到列表,选择函数计算,然后单击下一步
      参数 描述 示例值
      Connector名称 Connector的名称。取值:
      • 可以包含数字、小写英文字母和短划线(-),但不能以短划线(-)开头,长度限制为48个字符。
      • 同一个消息队列Kafka版实例内保持唯一。

      Connector的数据同步任务必须使用名称为connect-任务名称的Consumer Group。如果您未手动创建该Consumer Group,系统将为您自动创建。

      kafka-fc-sink
      任务类型 Connector的数据同步任务类型。本文以数据从消息队列Kafka版同步至函数计算为例。更多任务类型,请参见Connector类型 KAFKA2FC
    2. 源实例配置下方的数据源Topic文本框,输入数据源Topic的名称,从消费初始位置列表,选择消费初始位置,在创建资源,选择自动创建或者选择手动创建并输入手动创建的Topic的名称,然后单击下一步
      参数 描述 示例值
      VPC ID 数据同步任务所在的VPC。默认为消息队列Kafka版实例所在的VPC,您无需填写。 vpc-bp1xpdnd3l***
      VSwitch ID 数据同步任务所在的交换机。用户交换机必须与消息队列Kafka版实例处于同一VPC。默认为部署消息队列Kafka版实例时填写的交换机。 vsw-bp1d2jgg81***
      数据源Topic 需要同步数据的Topic。 fc-test-input
      消费初始位置 开始消费的位置。取值:
      • latest:从最新位点开始消费。
      • earliest:从最初位点开始消费。
      latest
      Connector消费组 Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。 connect-cluster-kafka-fc-sink
      任务位点Topic 用于存储消费位点的Topic。
      • Topic名称:建议以connect-offset开头。
      • 分区数:Topic的分区数量必须大于1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-offset-kafka-fc-sink
      任务配置Topic 用于存储任务配置的Topic。
      • Topic名称:建议以connect-config开头。
      • 分区数:Topic的分区数量必须为1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-config-kafka-fc-sink
      任务状态Topic 用于存储任务状态的Topic。
      • Topic名称:建议以connect-status开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-status-kafka-fc-sink
      死信队列Topic 用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
      • Topic名称:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-fc-sink
      异常数据Topic 用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
      • Topic名称:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-fc-sink
    3. 目标实例配置下方,输入函数计算服务的属性,设置发送模式和发送批大小,然后单击下一步
      参数 描述 示例值
      服务地域 函数计算服务的地域。 cn-hangzhou
      服务接入点 函数计算服务的接入点。在函数计算控制台的概览页的常用信息区域获取。
      • 内网Endpoint:低延迟,推荐。适用于消息队列Kafka版实例和函数计算处于同一地域场景。
      • 公网Endpoint:高延迟,不推荐。适用于消息队列Kafka版实例和函数计算处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。更多信息,请参见为Connector开启公网访问
      http://188***.cn-hangzhou.fc.aliyuncs.com
      服务账号 函数计算服务的阿里云账号ID。在函数计算控制台的概览页的常用信息区域获取。 188***
      授权角色名 消息队列Kafka版的RAM角色的名称。更多信息,请参见创建RAM角色 AliyunKafkaConnectorRole
      服务名 函数计算服务的名称。 guide-hello_world
      服务方法名 函数计算服务的函数名称。 hello_world
      服务版本 函数计算服务的版本。 LATEST
      发送模式 消息发送模式。取值:
      • 异步:推荐。
      • 同步:不推荐。同步发送模式下,如果函数计算的处理时间较长,消息队列Kafka版的处理时间也会较长。当同一批次消息的处理时间超过5分钟时,会触发消息队列Kafka版客户端Rebalance。
      异步
      发送批次大小 批量发送的消息条数。默认为20。Connector根据发送批次大小和请求大小限制(同步请求大小限制为6 MB,异步请求大小限制为128 KB)将多条消息聚合后发送。例如,发送模式为异步,发送批次大小为20,如果要发送18条消息,其中有17条消息的总大小为127 KB,有1条消息的大小为200 KB,Connector会将总大小不超过128 KB的17条消息聚合后发送,将大小超过128 KB的1条消息单独发送。
      说明 如果您在发送消息时将key设置为null,则请求中不包含key。如果将value设置为null,则请求中不包含value。
      • 如果批量发送的多条消息的大小不超过请求大小限制,则请求中包含消息内容。请求示例如下:
        [
            {
                "key":"this is the message's key2",
                "offset":8,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325438,
                "topic":"Test",
                "value":"this is the message's value2",
                "valueSize":28
            },
            {
                "key":"this is the message's key9",
                "offset":9,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325440,
                "topic":"Test",
                "value":"this is the message's value9",
                "valueSize":28
            },
            {
                "key":"this is the message's key12",
                "offset":10,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325442,
                "topic":"Test",
                "value":"this is the message's value12",
                "valueSize":29
            },
            {
                "key":"this is the message's key38",
                "offset":11,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325464,
                "topic":"Test",
                "value":"this is the message's value38",
                "valueSize":29
            }
        ]
      • 如果发送的单条消息的大小超过请求大小限制,则请求中不包含消息内容。请求示例如下:
        [
            {
                "key":"123",
                "offset":4,
                "overflowFlag":true,
                "partition":0,
                "timestamp":1603779578478,
                "topic":"Test",
                "value":"1",
                "valueSize":272687
            }
        ]
        说明 如需获取消息内容,您需要根据位点主动拉取消息。
      50
      重试次数 消息发送失败后的重试次数。默认为2。取值范围为1~3。部分导致消息发送失败的错误不支持重试。错误的错误码与是否支持重试的对应关系如下:
      • 4XX:除429支持重试外,其余错误码不支持重试。
      • 5XX:支持重试。
      说明
      • 关于错误码的更多信息,请参见错误码
      • Connector调用InvokeFunction向函数计算发送消息。关于InvokeFunction的更多信息,请参见API 定义
      2
      失败处理 消息发送失败后的错误处理。默认为log。取值:
      • log:继续对出现错误的Topic的分区的订阅,并打印错误日志。出现错误后,您可以通过Connector日志查看错误,并根据错误的错误码查找解决方案,以进行自助排查。
        说明
      • fail:停止对出现错误的Topic的分区的订阅,并打印错误日志。出现错误后,您可以通过Connector日志查看错误,并根据错误的错误码查找解决方案,以进行自助排查。
        说明
        • 如何查看Connector日志,请参见查看Connector日志
        • 如何根据错误码查找解决方案,请参见错误码
        • 如需恢复对出现错误的Topic的分区的订阅,您需要提交提交工单联系消息队列Kafka版技术人员。
      log
  6. 预览/创建下方,确认Connector的配置,然后单击提交
    提交完成后,刷新Connector页面以显示创建的Connector。

部署FC Sink Connector

创建FC Sink Connector后,您需要部署该FC Sink Connector以使其将数据从消息队列Kafka版同步至函数计算。

Connector页面,找到创建的FC Sink Connector,在其右侧操作列,单击部署
部署完成后,Connector状态显示运行中。

发送消息

部署FC Sink Connector后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否被同步至函数计算。

  1. 在左侧导航栏,单击Topic管理
  2. Topic管理页面,选择实例,找到fc-test-input,在其右侧操作列,单击发送消息
  3. 发送消息对话框,发送测试消息。
    1. 分区文本框,输入0
    2. Message Key文本框,输入1
    3. Message Value文本框,输入1
    4. 单击发送

查看函数日志

消息队列Kafka版的数据源Topic发送消息后,查看函数日志,验证是否收到消息。更多信息,请参见配置并查看函数日志

日志中显示发送的测试消息。

fc LOG