实时业务场景很复杂,乱序延迟通常不固定(比如高峰期延迟30秒,低峰期延迟5秒),可以实现WatermarkGenerator接口,自定义水印生成逻辑,比如基于周期生成、基于数据条数生成,灵活适配特殊场景。
一、核心思路(适配动态乱序延迟场景)
针对 “高峰期延迟 30 秒、低峰期延迟 5 秒” 的动态乱序场景,自定义水印生成器的核心逻辑:
区分业务时段:通过系统时间 / 数据量判断当前是高峰 / 低峰期;
动态调整延迟:高峰期用 30 秒延迟,低峰期用 5 秒延迟;
周期性生成水印:按 Flink 默认的 200ms 周期,基于当前最大事件时间 - 动态延迟生成水印;
兼容空闲流:可选搭配
withIdleness避免水印卡住。
二、完整可运行代码(动态延迟水印生成器)
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 自定义水印生成器:动态调整乱序延迟(高峰30秒/低峰5秒)
*/
public class DynamicWatermarkGeneratorDemo {
// 业务数据类(订单示例)
public static class OrderEvent {
private String orderId;
private Long eventTime; // 事件时间(毫秒)
private Double amount;
// 构造器
public OrderEvent(String orderId, Long eventTime, Double amount) {
this.orderId = orderId;
this.eventTime = eventTime;
this.amount = amount;
}
// getter
public Long getEventTime() { return eventTime; }
}
/**
* 自定义水印生成器:动态调整乱序延迟
*/
public static class DynamicWatermarkGenerator implements WatermarkGenerator<OrderEvent> {
// 记录当前数据流的最大事件时间
private long maxEventTime = Long.MIN_VALUE;
// 高峰时段(比如9:00-12:00)
private final int PEAK_START_HOUR = 9;
private final int PEAK_END_HOUR = 12;
// 延迟配置(毫秒)
private final long PEAK_DELAY = 30 * 1000; // 高峰期30秒
private final long NORMAL_DELAY = 5 * 1000; // 低峰期5秒
/**
* 每来一条数据触发:更新最大事件时间
* @param event 业务数据
* @param eventTimestamp 数据的事件时间(已通过TimestampAssigner提取)
* @param output 水印输出器
*/
@Override
public void onEvent(OrderEvent event, long eventTimestamp, WatermarkOutput output) {
// 更新最大事件时间(保证单调递增)
maxEventTime = Math.max(maxEventTime, eventTimestamp);
}
/**
* 周期性触发(默认200ms):生成水印
* @param output 水印输出器
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 1. 判断当前时段(高峰/低峰)
long currentDelay = getCurrentDelay();
// 2. 计算水印:最大事件时间 - 动态延迟
long watermarkTs = maxEventTime - currentDelay;
// 3. 发射水印(水印时间戳必须单调递增)
output.emitWatermark(new Watermark(watermarkTs));
// 调试日志(可选)
System.out.printf("当前时间:%s,最大事件时间:%d,动态延迟:%dms,水印时间:%d%n",
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME),
maxEventTime, currentDelay, watermarkTs);
}
/**
* 核心逻辑:根据当前时段动态返回延迟时间
*/
private long getCurrentDelay() {
int currentHour = LocalDateTime.now().getHour();
// 判断是否在高峰时段
if (currentHour >= PEAK_START_HOUR && currentHour < PEAK_END_HOUR) {
return PEAK_DELAY;
} else {
return NORMAL_DELAY;
}
}
}
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置水印生成周期(默认200ms,可自定义)
env.getConfig().setAutoWatermarkInterval(200);
// 2. 模拟数据源(生成高峰/低峰订单数据)
DataStream<OrderEvent> orderStream = env.addSource(new SourceFunction<OrderEvent>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<OrderEvent> ctx) throws Exception {
long baseTs = System.currentTimeMillis();
int count = 0;
while (isRunning) {
// 生成订单数据(事件时间模拟乱序:±随机延迟)
long eventTime = baseTs - (long) (Math.random() * 40 * 1000); // 最大乱序40秒
OrderEvent event = new OrderEvent("ORDER_" + count, eventTime, 100.0);
ctx.collect(event);
count++;
Thread.sleep(100); // 每秒生成10条数据
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// 3. 定义水印策略(自定义生成器 + 空闲流处理)
WatermarkStrategy<OrderEvent> dynamicWatermarkStrategy = WatermarkStrategy
// 绑定自定义水印生成器
.<OrderEvent>forGenerator(context -> new DynamicWatermarkGenerator())
// 配置空闲流超时(10秒无数据则标记为空闲)
.withIdleness(Duration.ofSeconds(10))
// 指定事件时间字段(从业务数据提取)
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
// 4. 应用水印策略
DataStream<OrderEvent> streamWithWatermark = orderStream.assignTimestampsAndWatermarks(dynamicWatermarkStrategy);
// 5. 窗口计算(验证水印效果)
streamWithWatermark
.keyBy(event -> event.getOrderId().substring(0, 5)) // 按前缀分组
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1))) // 1分钟滚动窗口
.sum("amount") // 统计订单金额
.print("窗口结果");
// 6. 执行任务
env.execute("Dynamic Watermark Generator Demo");
}
}
三、核心代码解释
1. 自定义水印生成器(DynamicWatermarkGenerator)
onEvent方法:每接收一条数据,更新maxEventTime(保证最大事件时间单调递增,核心前提);onPeriodicEmit方法:Flink 默认每 200ms 调用一次,核心逻辑:getCurrentDelay():判断当前时段(9-12 点为高峰),返回 30 秒 / 5 秒延迟;水印计算:
maxEventTime - 动态延迟,保证水印能覆盖对应时段的乱序数据;emitWatermark:发射水印(水印时间戳必须单调递增,否则 Flink 会忽略)。
2. 关键配置
setAutoWatermarkInterval(200):设置水印生成周期(默认 200ms,可根据业务调整);withIdleness(Duration.ofSeconds(10)):解决多并行度下空闲流卡住水印的问题;withTimestampAssigner:提取业务数据的eventTime作为事件时间(忽略 Flink 默认的timestamp)。
3. 动态延迟的扩展思路
如果你的 “高峰 / 低峰” 判断逻辑更复杂(比如基于数据量、QPS),可修改getCurrentDelay()方法:
// 示例:基于数据量判断高峰(每秒超过100条为高峰)
private long getCurrentDelay() {
long currentQps = calculateCurrentQps(); // 自定义方法:计算当前QPS
return currentQps > 100 ? PEAK_DELAY : NORMAL_DELAY;
}四、运行验证
环境依赖:Flink 1.12+(
WatermarkStrategy是 1.12 + 的推荐 API);日志观察:控制台会打印水印生成日志,可看到:
9-12 点:水印延迟 30 秒(
动态延迟:30000ms);其他时段:水印延迟 5 秒(
动态延迟:5000ms);
窗口效果:高峰期能容纳 30 秒内的乱序数据,低峰期仅容纳 5 秒,兼顾准确性和实时性。
五、生产环境注意事项
单调递增保障:
maxEventTime必须保证单调递增(否则水印会倒退,Flink 直接忽略);延迟阈值兜底:建议设置最大延迟上限(比如即使高峰,也不超过 60 秒),避免水印过度滞后;
监控告警:对水印滞后时间做监控(比如水印比最大事件时间滞后超过 1 分钟则告警);
结合侧输出流:配置
sideOutputLateData,捕获超延迟的极端数据,避免丢失。
总结
自定义水印生成器的核心是实现
WatermarkGenerator接口,重写onEvent(更新最大事件时间)和onPeriodicEmit(生成水印);动态延迟的关键是在
onPeriodicEmit中根据业务规则(时段 / 数据量)调整延迟值;生产环境需搭配
withIdleness和侧输出流,保证水印推进和数据不丢失。
评论