本文介绍如何创建FC Sink Connector,您可以通过FC Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至函数计算的函数。

前提条件

详细步骤,请参见创建前提

注意事项

  • 目标函数类型必须为事件函数,不能选择HTTP函数。
  • 同一地域的Kafka消息投递到函数计算通过内网进行数据传输,不会产生公网流量费用。

步骤一:创建目标服务资源

在函数计算创建函数。更多信息,请参见通过控制台创建函数应用

本文以服务名称为guide-fc-sink-service、函数名称为sink-fc、运行环境为Python的事件函数为例。该示例函数的代码如下:

# -*- coding: utf-8 -*-
import logging
import json

def handler(event, context):
  logger = logging.getLogger()
  evt = json.loads(event)
  logger.info(evt)
  return "success"

步骤二:创建FC Sink Connector并启动

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
  2. 在左侧导航栏,选择Connector生态集成 > 消息流出(Sink)
  3. 消息流出(Sink)页面,单击创建任务
  4. 消息流出创建面板,配置以下参数,单击确定
    1. 基础信息区域,设置任务名称,将流出类型选择为函数计算FC
    2. 资源配置区域,设置以下参数。
      表 1. 源(云消息队列 Kafka 版
      参数说明示例
      地域源Kafka实例所在的地域。华东1(杭州)
      kafka实例数据源所在的Kafka实例ID。alikafka_post-cn-9hdsbdhd****
      Topic数据源所在的Kafka实例Topic。guide-sink-topic
      Group ID数据源所在的Kafka实例中的Group ID。
      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
      • 使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
      使用已有
      并发配额(消费者数)消费Topic数据的并发线程数,线程和Topic分区的对应关系如下:
      • Topic分区数=并发消费数:一个线程消费一个Topic分区。建议使用。
      • Topic分区数>并发消费数:多个并发消费会均摊所有分区消费。
      • Topic分区数<并发消费数:一个线程消费一个Topic分区,多出的消费数无效。
      2
      消费位点
      • 最新位点:从最新位点开始消费。
      • 最早位点:从最初位点开始消费。
      最新位点
      网络配置有跨境传输数据需求时选择自建公网,其他情况可选择默认网络默认网络
      表 2. 目标(函数计算)
      参数说明示例
      服务已创建的函数计算的服务名。guide-fc-sink-service
      函数已创建的函数计算中的函数。sink-fc
      执行方式
      • 异步:以异步方式请求函数计算,Payload限制为128 KB。
      • 同步:以同步方式请求函数计算,Payload限制为32 MB,同步请求超时时间为5 min。
      同步
      批量推送批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔两个条件达到任意一个时即会触发批量推送。例如:您设置的推送条数为100条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等待15 s后再推送。
      • 开启:开启批量推送功能。
      • 关闭:关闭批量推动功能。默认每次仅投递给FC一条消息。
      开启
      批量推送条数一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求。取值范围为[1,10000]。仅当批量推送参数设置为开启时需要配置此参数。100
      批量推送间隔调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算。取值范围为[0,15],单位:秒。取值为0表示没有等待时间,直接投递。仅当批量推送参数设置为开启时需要配置此参数。10
    完成上述配置后,在消息流出(Sink)页面,找到刚创建的FC Sink Connector任务,单击其右侧操作列的启动。当状态栏由启动中变为运行中时,Connector创建成功。

步骤三:测试FC Sink Connector

  1. 消息流出(Sink)页面,在FC Sink Connector任务的事件源列单击源Topic。
    发送消息
  2. 在Topic详情页面,单击体验发送消息
  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定
    发送消息
  4. 消息流出(Sink)页面,在FC Sink Connector任务的事件目标列单击目标函数。
    源
  5. 单击调用日志页签,然后单击调用请求列表页签,查看调用日志详细信息。
    日志