实时业务场景很复杂,乱序延迟通常不固定(比如高峰期延迟30秒,低峰期延迟5秒),可以实现WatermarkGenerator接口,自定义水印生成逻辑,比如基于周期生成、基于数据条数生成,灵活适配特殊场景。

一、核心思路(适配动态乱序延迟场景)

针对 “高峰期延迟 30 秒、低峰期延迟 5 秒” 的动态乱序场景,自定义水印生成器的核心逻辑:

  1. 区分业务时段:通过系统时间 / 数据量判断当前是高峰 / 低峰期;

  2. 动态调整延迟:高峰期用 30 秒延迟,低峰期用 5 秒延迟;

  3. 周期性生成水印:按 Flink 默认的 200ms 周期,基于当前最大事件时间 - 动态延迟生成水印;

  4. 兼容空闲流:可选搭配withIdleness避免水印卡住。

二、完整可运行代码(动态延迟水印生成器)

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 自定义水印生成器:动态调整乱序延迟(高峰30秒/低峰5秒)
 */
public class DynamicWatermarkGeneratorDemo {

    // 业务数据类(订单示例)
    public static class OrderEvent {
        private String orderId;
        private Long eventTime; // 事件时间(毫秒)
        private Double amount;

        // 构造器
        public OrderEvent(String orderId, Long eventTime, Double amount) {
            this.orderId = orderId;
            this.eventTime = eventTime;
            this.amount = amount;
        }

        // getter
        public Long getEventTime() { return eventTime; }
    }

    /**
     * 自定义水印生成器:动态调整乱序延迟
     */
    public static class DynamicWatermarkGenerator implements WatermarkGenerator<OrderEvent> {
        // 记录当前数据流的最大事件时间
        private long maxEventTime = Long.MIN_VALUE;
        // 高峰时段(比如9:00-12:00)
        private final int PEAK_START_HOUR = 9;
        private final int PEAK_END_HOUR = 12;
        // 延迟配置(毫秒)
        private final long PEAK_DELAY = 30 * 1000; // 高峰期30秒
        private final long NORMAL_DELAY = 5 * 1000; // 低峰期5秒

        /**
         * 每来一条数据触发:更新最大事件时间
         * @param event 业务数据
         * @param eventTimestamp 数据的事件时间(已通过TimestampAssigner提取)
         * @param output 水印输出器
         */
        @Override
        public void onEvent(OrderEvent event, long eventTimestamp, WatermarkOutput output) {
            // 更新最大事件时间(保证单调递增)
            maxEventTime = Math.max(maxEventTime, eventTimestamp);
        }

        /**
         * 周期性触发(默认200ms):生成水印
         * @param output 水印输出器
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 1. 判断当前时段(高峰/低峰)
            long currentDelay = getCurrentDelay();
            // 2. 计算水印:最大事件时间 - 动态延迟
            long watermarkTs = maxEventTime - currentDelay;
            // 3. 发射水印(水印时间戳必须单调递增)
            output.emitWatermark(new Watermark(watermarkTs));

            // 调试日志(可选)
            System.out.printf("当前时间:%s,最大事件时间:%d,动态延迟:%dms,水印时间:%d%n",
                    LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME),
                    maxEventTime, currentDelay, watermarkTs);
        }

        /**
         * 核心逻辑:根据当前时段动态返回延迟时间
         */
        private long getCurrentDelay() {
            int currentHour = LocalDateTime.now().getHour();
            // 判断是否在高峰时段
            if (currentHour >= PEAK_START_HOUR && currentHour < PEAK_END_HOUR) {
                return PEAK_DELAY;
            } else {
                return NORMAL_DELAY;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置水印生成周期(默认200ms,可自定义)
        env.getConfig().setAutoWatermarkInterval(200);

        // 2. 模拟数据源(生成高峰/低峰订单数据)
        DataStream<OrderEvent> orderStream = env.addSource(new SourceFunction<OrderEvent>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<OrderEvent> ctx) throws Exception {
                long baseTs = System.currentTimeMillis();
                int count = 0;
                while (isRunning) {
                    // 生成订单数据(事件时间模拟乱序:±随机延迟)
                    long eventTime = baseTs - (long) (Math.random() * 40 * 1000); // 最大乱序40秒
                    OrderEvent event = new OrderEvent("ORDER_" + count, eventTime, 100.0);
                    ctx.collect(event);
                    count++;
                    Thread.sleep(100); // 每秒生成10条数据
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 3. 定义水印策略(自定义生成器 + 空闲流处理)
        WatermarkStrategy<OrderEvent> dynamicWatermarkStrategy = WatermarkStrategy
                // 绑定自定义水印生成器
                .<OrderEvent>forGenerator(context -> new DynamicWatermarkGenerator())
                // 配置空闲流超时(10秒无数据则标记为空闲)
                .withIdleness(Duration.ofSeconds(10))
                // 指定事件时间字段(从业务数据提取)
                .withTimestampAssigner((event, timestamp) -> event.getEventTime());

        // 4. 应用水印策略
        DataStream<OrderEvent> streamWithWatermark = orderStream.assignTimestampsAndWatermarks(dynamicWatermarkStrategy);

        // 5. 窗口计算(验证水印效果)
        streamWithWatermark
                .keyBy(event -> event.getOrderId().substring(0, 5)) // 按前缀分组
                .window(TumblingEventTimeWindows.of(Duration.ofMinutes(1))) // 1分钟滚动窗口
                .sum("amount") // 统计订单金额
                .print("窗口结果");

        // 6. 执行任务
        env.execute("Dynamic Watermark Generator Demo");
    }
}

三、核心代码解释

1. 自定义水印生成器(DynamicWatermarkGenerator

  • onEvent 方法:每接收一条数据,更新maxEventTime(保证最大事件时间单调递增,核心前提);

  • onPeriodicEmit 方法:Flink 默认每 200ms 调用一次,核心逻辑:

    • getCurrentDelay():判断当前时段(9-12 点为高峰),返回 30 秒 / 5 秒延迟;

    • 水印计算:maxEventTime - 动态延迟,保证水印能覆盖对应时段的乱序数据;

    • emitWatermark:发射水印(水印时间戳必须单调递增,否则 Flink 会忽略)。

2. 关键配置

  • setAutoWatermarkInterval(200):设置水印生成周期(默认 200ms,可根据业务调整);

  • withIdleness(Duration.ofSeconds(10)):解决多并行度下空闲流卡住水印的问题;

  • withTimestampAssigner:提取业务数据的eventTime作为事件时间(忽略 Flink 默认的timestamp)。

3. 动态延迟的扩展思路

如果你的 “高峰 / 低峰” 判断逻辑更复杂(比如基于数据量、QPS),可修改getCurrentDelay()方法:

// 示例:基于数据量判断高峰(每秒超过100条为高峰)
private long getCurrentDelay() {
    long currentQps = calculateCurrentQps(); // 自定义方法:计算当前QPS
    return currentQps > 100 ? PEAK_DELAY : NORMAL_DELAY;
}

四、运行验证

  1. 环境依赖:Flink 1.12+(WatermarkStrategy是 1.12 + 的推荐 API);

  2. 日志观察:控制台会打印水印生成日志,可看到:

    • 9-12 点:水印延迟 30 秒(动态延迟:30000ms);

    • 其他时段:水印延迟 5 秒(动态延迟:5000ms);

  3. 窗口效果:高峰期能容纳 30 秒内的乱序数据,低峰期仅容纳 5 秒,兼顾准确性和实时性。

五、生产环境注意事项

  1. 单调递增保障maxEventTime必须保证单调递增(否则水印会倒退,Flink 直接忽略);

  2. 延迟阈值兜底:建议设置最大延迟上限(比如即使高峰,也不超过 60 秒),避免水印过度滞后;

  3. 监控告警:对水印滞后时间做监控(比如水印比最大事件时间滞后超过 1 分钟则告警);

  4. 结合侧输出流:配置sideOutputLateData,捕获超延迟的极端数据,避免丢失。

总结

  1. 自定义水印生成器的核心是实现WatermarkGenerator接口,重写onEvent(更新最大事件时间)和onPeriodicEmit(生成水印);

  2. 动态延迟的关键是在onPeriodicEmit中根据业务规则(时段 / 数据量)调整延迟值;

  3. 生产环境需搭配withIdleness和侧输出流,保证水印推进和数据不丢失。