Flink自定义触发器开发指南

Apache Flink的流处理中,窗口触发器(Window Trigger)是决定何时对窗口中的数据进行计算和输出结果的关键机制。Flink 提供了多种内置的触发器来满足常见的场景需求,但在复杂的业务逻辑下,使用 自定义触发器(Custom Trigger)可以实现更灵活、更贴近业务的行为。

窗口触发器概述

触发器的作用

  1. 监控进入窗口的数据元素。

  2. 根据时间,计数或其他业务条件判断是否应触发窗口。

  3. 返回以下操作之一:

    • CONTINUE:继续收集数据。

    • FIRE:触发窗口计算并保留数据;

    • FIRE_AND_PURGE:触发窗口计算并清除数据;

    • PURGE:仅清除数据而不触发计算。

内置触发器类型

Flink 提供了一些常用的内置触发器,适用于基本的时间或计数驱动场景:

触发器类型

说明

EventTimeTrigger

当事件时间水印超过窗口结束时间时触发,默认用于事件时间窗口。

ProcessingTimeTrigger

当处理时间到达窗口结束时间时触发,默认用于处理时间窗口。

CountTrigger

当窗口内元素数量达到指定阈值时触发。

PurgingTrigger

包装其他触发器,使其在触发时自动清除窗口内容。

重要

如果为某个窗口设置了某个触发器,它会完全替换默认触发器 ,而不是叠加。例如,如果为事件时间窗口设置CountTrigger,则不会再根据事件时间触发。

虽然内置触发器能满足一些基本需求,但在实际应用中,我们常常需要:

  • 组合多个触发条件 (如“5条记录或1分钟超时”);

  • 基于特定事件触发 (如用户登出、订单完成);

  • 控制窗口生命周期 (早发、延迟、多次触发等);

  • 避免默认行为带来的副作用 (如防止迟到数据再次触发窗口);

这些复杂需求无法通过简单的内置触发器实现,因此引入自定义触发器成为必要选择。

创建自定义触发器

  1. 继承抽象类Trigger<T, W>

    • T是窗口中数据的类型。

    • W是窗口类型(如TimeWindowTimeWindow子类)

    重写以下核心方法:

    public abstract class Trigger<T, W extends Window> implements Serializable {
        public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
        public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
        public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
        public void clear(W window, TriggerContext ctx);
        public boolean canMerge();
        public void onMerge(W window, OnMergeContext mergeContext);
    }
  2. 方法说明与操作

    方法

    作用

    操作

    onElement()

    每个新元素进入窗口时调用

    用于判断是否满足触发条件(如计数、特殊事件)。

    onProcessingTime()

    处理时间定时器触发时调用

    基于处理时间的逻辑(较少使用)。

    onEventTime()

    事件时间定时器触发时调用

    常用于窗口关闭或超时处理。

    clear()

    清除窗口状态

    必须在此方法中清理所有状态变量,防止内存泄漏。

    canMerge()/onMerge()

    合并窗口时调用(如会话窗口)

    如果使用合并窗口(如会话窗口),必须正确更新定时器。

  3. 使用TriggerContext管理状态与定时器

    • 状态管理:通过ctx.getPartitionedState(StateDescriptor)获取窗口相关的状态(如计数器);

    • 定时器管理 :通过ctx.registerEventTimeTimer(timestamp)注册定时器;

    • 状态清理 :在clear()中使用state.clear()清除状态;

    • 定时器删除 :通常无需手动删除定时器,Flink 会在窗口关闭时自动清理。

应用场景与代码示例

场景示例:在一个滚动事件时间窗口(如每小时)中,若某用户活动频繁,则在其进入窗口的第5条记录时立即触发一次窗口计算,同时仍保证在窗口结束时再触发一次最终结果。

代码示例:数据达到五条需要进行一次窗口计算,且只额外触发一次。

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // 用于记录每个 key 和 window 中已经接收的元素数量
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // 用于记录是否已经触发过计算
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // 当有一个新元素进入窗口时调用
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // 获取当前 key + window 对应的状态值
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // 每来一条数据,计数加一
        count += 1;
        countState.update(count); // 更新状态
        
        // 如果计数 = 5,立即触发窗口计算
        if (count >= 5 && !flag) {
            flagState.update(true); // 更新状态,保证只触发一次
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // 处理时间定时器不处理 
        return TriggerResult.CONTINUE;
    }

    // 当注册的事件时间定时器被触发时调用(例如窗口结束时间)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // 触发并清除窗口
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // 清理窗口状态
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

触发器使用:必须配合 Windowed Stream 操作。

DataStream<UserEvent> source = ...; // 已有 source 流

source.keyBy(keySelector)  // 如 .keyBy(value -> value.userId) 按用户分组
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) // 设置滚动窗口大小为60秒
      .trigger(new CustomCountTrigger()) // 自定义触发器:5条数据或超时触发
      .process(new ProcessWindowFunction<UserEvent, String, KeyedType, TimeWindow>() {

          @Override
          public void process(KeyedType key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
              int count = 0;

              // 遍历窗口内所有元素,筛选 action == 1 的进行计数
              for (UserEvent event : elements) {
                  if (event.action == 1) {
                      count++;
                  }
              }

              // 输出结果
              out.collect("Key: " + key + ", 访问次数为:" + count);
          }
      })
      .print();

结果查验:如果一个用户在一分钟内有8次访问记录,输出的结果应该为2条。

Key: 101, 访问次数为:5       // count >= 5,提前触发计算,不清空状态。
Key: 101, 访问次数为:8      //  窗口关闭,触发计算,清空状态。

扩展延伸与总结

常见场景扩展

场景一

如果想多次告警,触发窗口内的计算和事件,应该如何处理?

  1. 去除flagStateDesc标志位,即可在多次触发计算和事件。

  2. 如果也可以添加计数标记来达到触发一定次数后再结束。(如告警事件)

    ### 去除标志位后,如有八条数据则会产生5条记录。  
    Key: 101, 访问次数为:5       // count >= 5,提前触发计算,不清空状态。
    Key: 101, 访问次数为:6       // 满足count >= 5,提前触发计算
    Key: 101, 访问次数为:7      // 满足count >= 5,提前触发计算
    Key: 101, 访问次数为:8      // 满足count >= 5,提前触发计算
    Key: 101, 访问次数为:8      //  窗口关闭,触发计算,清空状态。
  3. 触发器只决定何时触发计算,获取真正的对象信息的内容在.process当中,可以通过数量和条件状态等,输出不同的collect。

场景二

窗口的关闭是需要水平线推动的,如果在很长一段时间内没有数据产生,如何保证能按时触发关窗计算呢?

方案

是否依赖watermark

是否保证准时触发

是否适合乱序

操作可行性

ProcessingTime 窗口

若不要求事件时间

使用 withIdleness 方法

否(取决于watermark interval)

适合简单场景,如某分区数据源空闲

自定义 WatermarkGenerator

是(依赖定期更新)

标准做法

Trigger 注册定时器

是(可选)

是(容错)

否(强制关窗需要设置合适时间)

增强可靠性

外部维护心跳器

Kafka需额外维护,任务编排不需要

  • 方案一:改用 ProcessingTime 窗口

    如果不需要事件时间语义(比如不关心事件发生的时间),可以直接使用处理时间窗口:

    .keyBy(keySelector)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) // 使用处理时间
    .process(new MyProcessWindowFunction())
  • 方案二:使用 withIdleness 方法

    FlinkWatermarkStrategy 提供了一个便捷方法 withIdleness,用于检测数据源是否处于空闲状态,并在空闲超过指定时间后将其标记为空闲,从而避免其阻塞整体 Watermark 的生成。

    // 空闲的数据源将不再参与 Watermark 的最小值计算,从而不会阻碍其他活跃数据源的 Watermark 推进。
    WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));  //表示如果一个数据源或分区在 1 分钟内没有任何事件,则被标记为空闲
  • 方案三:自定义 WatermarkGenerator

    如果坚持使用事件时间,就需要确保即使没有新数据到达,也能继续推进水位线 。可以实现一个自定义生成Watermark的机制。

    • onEvent()方法中记录最近一次接收到事件的时间。

    • onPeriodicEmit()方法中检查当前时间与最后一次接收到事件的时间之间的间隔。

    • 如果间隔超过了设定的阈值,则认为该数据源已空闲,跳过生成 Watermark 或直接生成特定的Watermark。

    public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> {
        private long lastEventTimestamp = Long.MIN_VALUE;
        private final long maxIdleTimeMs; // 最大空闲时间
    
        public IdleAwareWatermarkGenerator(long maxIdleTimeMs) {
            this.maxIdleTimeMs = maxIdleTimeMs;
        }
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp);
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            long currentTime = System.currentTimeMillis();
            if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) {
                // 如果长时间没有事件,则不发出新的 Watermark
                return;
            }
            output.emitWatermark(new Watermark(lastEventTimestamp));
        }
    }
    WatermarkStrategy<MyEvent> strategy = WatermarkStrategy
        .forGenerator((ctx) -> new IdleAwareWatermarkGenerator(60_000)) // 设置最大空闲时间为 60 秒
        .withTimestampAssigner((event, timestamp) -> event.getEventTime());
  • 方案三:强制注册定时器

    @Override
    public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerEventTimeTimer(window.maxTimestamp()); // 基于事件时间
        ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // 容错:即使没事件也触发
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE; // 强制触发并清空
    }
  • 方案四:外部维护心跳器

    可以通过Kafka或者任务编排,定时向数据下游发送心跳数据,触发关窗操作。

核心要点总结

操作要点

要点说明

理解窗口生命周期

FIRE不会清除窗口内容,只有FIRE_AND_PURGE才会真正关闭窗口。合理使用可实现多次触发或最终一次性触发。

合理使用状态与定时器

使用ValueState来跟踪计数、标志位等信息,并在clear()中释放资源。

覆盖所有方法

至少实现onElement,onEventTime,onProcessingTime,clear四个方法,确保完整性。

支持窗口合并(如会话窗口)

对于会话窗口等合并型窗口,实现canMerge()onMerge()方法,保持定时器一致性。

避免重复触发问题

控制触发次数,尤其是在FIRE后可能继续接收数据的情况下。

完整代码示例

CustomCountTrigger

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // 用于记录每个 key 和 window 中已经接收的元素数量
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // 用于记录是否已经触发过计算
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // 当有一个新元素进入窗口时调用
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        // 获取当前 key + window 对应的状态值
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // 每来一条数据,计数加一
        count += 1;
        countState.update(count); // 更新状态

        // 如果计数 = 5,立即触发窗口计算
        if (count >= 5 && !flag) {
            flagState.update(true); // 更新状态,保证只触发一次
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // 时间定时器不处理
        return TriggerResult.CONTINUE;
    }

    // 当注册的事件时间定时器被触发时调用(例如窗口结束时间)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // 触发并清除窗口
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // 清理窗口状态
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

KafkaTriggerTest

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;


public class KafkaTriggerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("trigger")
                .setGroupId("trigger")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        // 数据样例:101,alie,1,2025-6-10 10:02:00
        DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .map(new MapFunction<String, UserEvent>() {
                    @Override
                    public UserEvent map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new UserEvent(
                                Integer.parseInt(fields[0]),
                                fields[1],
                                fields[2],
                                fields[3]
                        );
                    }
                });

        WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
                .<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
                .withTimestampAssigner((event, timestamp) -> {
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    return java.time.LocalDateTime.parse(event.getEvent_time(), formatter).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                });

        DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);

        timestampedStream
                .keyBy(UserEvent::getUser_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(60))) // 设置滚动窗口大小为60秒
                .trigger(new CustomCountTrigger()) // 自定义触发器:5条数据或超时触发
                .process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {

                    @Override
                             public void process(Integer userId, ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>.Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
                                 int count = 0;
                                 for (UserEvent event : userEvents) {
                                     if (event.getEvent_type().equals("1"))
                                         count++;
                                 }
                                 collector.collect("User ID: " + userId + " | Count: " + count + " | Window: " + context.window());
                             }
                         }).print();
        env.execute("Kafka Partitioner Data Stream");
    }
}

UserEvent

public class UserEvent {
      private  int  user_id;
      private  String username;
      private  String event_type;
      private  String event_time;

      public UserEvent(int user_id, String username, String event_type, String event_time) {
              this.user_id = user_id;
              this.username = username;
              this.event_type = event_type;
              this.event_time = event_time;
      }
      public String toString() {
              return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
      }

    public int getUser_id() {
        return user_id;
    }

    public void setUser_id(int user_id) {
        this.user_id = user_id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getEvent_type() {
        return event_type;
    }

    public void setEvent_type(String event_type) {
        this.event_type = event_type;
    }

    public String getEvent_time() {
        return event_time;
    }

    public void setEvent_time(String event_time) {
        this.event_time = event_time;
    }
}