【Flink】Flink 中的时间和窗口之水位线(Watermark)

news/2024/7/7 19:50:11

1. 时间语义

这里先介绍一下什么是时间语义时间语义在Flink中是一种很重要的概念,下面介绍的水位线就是基于时间语义来讲的。
在这里插入图片描述

在Flink中我们提到的时间语义一般指的是事件时间处理时间

  • 处理时间(Processing Time),一般指执行处理操作的系统时间,也就是Flink的窗口算子对该数据的操作时间。
  • 事件时间(Event Time), 一般指每个事件在对应设备上发生的时间,也就是数据的生成的时间。

Flink中之所以会出现这两种时间语义,是因为Flink的分布式系统会有网络传输延迟以及时钟飘逸,处理时间相对于事件时间会有所滞后,并且数据在网络以及Flink中的传输是是乱序的。Flink的1.12版本之前默认使用的是处理时间,之后已经将事件时间作为默认的时间语义。

对于 先产生的数据先被处理,这就要求需要保证数据的到达顺序,但是因为网络传输延迟不确定性,是无法保证数据的到达顺序,在这种情况下就不能简单使用数据自带的时间戳当做时钟,而是需要另外的标志来表达事件时间的进展,在Flink中这就是事件时间水位线

2. 水位线

Flink中的水位线(Watermark)是一种用来表示事件时间进展的机制,它可以帮助Flink系统及时处理延迟数据,并保证处理结果的正确性。具体来说,水位线是一个时间戳,表示对于所有时间戳小于该水位线的事件已经全部到达,可以进行相应的计算处理。

2.1 事件时间和窗口

在实际应用中,一般都会采用事件时间语义,而水位线就是基于事件时间提出的概念。和水位线相关的还有一个窗口概念,事件时间和窗口也是有很大的关系。

这里举一个例子,我们有一个班车系统,班车上装的都是带有生产时间的商品,如果有一辆车的开车时间是8点,我们需要这辆车到九点就开始发车。商品一个个到达车上,车上的商品时间从8点开始依次增长。当到达车上的商品生产时间是九点的时候,这时候车门关闭开始发车。
在这里插入图片描述
在这个过程中,我们说的班车就相当于窗口,定义了一个8点到九点的窗口。并且我们会定义一个逻辑时钟,这个逻辑时钟是基于事件时间来的,不会自动流逝。时钟的进展就是靠着新的数据上的事件时间时间戳来推动的。这样就不需要依赖系统的时间,无论什么时候进行统计处理,得到的结果都是正确的。

2.2 什么是水位线

在事件时间语义下,我们可以不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。这样每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

但是在分布式系统中,会存在一些问题,因为数据本身在处理转换过程中会发生变化,如果遇到窗口聚合的操作,呢么下游的数据就会变少,时间进度的控制就不精细了。另外,数据向下游任务传递时,一般只能传输给一个子任务(除广播外),这样其他的并行子任务的时钟就无法推进了。

所以时钟也需要以数据的形式传递出去,告诉下游任务当前时间的进展;而且时钟传递过程也不会因为运算而停滞。解决这个问题的想法就是在数据流种加入一个时间标记,记录当前的事件时间并且广播到下游,当下游收到这个标记就可以更新自己的时钟了。这种用来衡量事件时间进展的标记在Flink中就被称作水位线。
在这里插入图片描述
如图 6-5 所示,每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。这里没有指定单位,可以理解为秒或者毫秒(方便起见,下面讲述统一认为是秒)。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线,随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。

水位线就像它的名字一样,是数据流中的一部分,随着数据一起流动,在不同任务之间传输。不过水位线也存在有序和乱序区分:

2.2.1 有序流中的水位线

在理想状态下,数据按照顺序排好队进入数据流,并且处理的过程遵循先来后到的原则,这样每个数据中提取的时间戳就会保持从小到大的增长,而水位线也会不断增长、事件时钟也不断向前推进。

但是在实际应用中,数据量非常大,还有可能好多数据的事件时间戳都是相同的,每一个都插入水位线是做了很多无用功的。所以为了提高效率一般每隔一段时间生成一个水位线,这个水位线的时间戳就是当前最新数据的时间戳,就如下图所示:

在这里插入图片描述
这里注意水位线插入的周期本身也是时间概念,并且这个周期时间是指处理时间也就是系统时间,而不是事件时间,如果使用事件时间就陷入了死循环了。

2.2.2 乱序流中的水位线

有序流的处理非常简单,但是在分布式系统中,会因为数据在节点的传输以及网络传输的延迟不确定性导致顺序发生改变,这就是乱序数据。

这里所说的乱序是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图 6-7 所示,一个 7 秒时产生的数据,生成时间自然要比 9 秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了 9 秒的数据,之后 7 秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?

在这里插入图片描述解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图 6-8 所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
在这里插入图片描述
考虑到大量数据同时到来的处理效率,我们同样可以周期性的生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如图 6-9 所示。
在这里插入图片描述但是又如何处理迟到的数据?其实也很简单,那就是等待几秒,比如上图当收到9秒的数据,等待2秒也就是7秒,这时候7秒的数据到来,数据就到齐了。
在这里插入图片描述
下图是我们使用周期性的方式生成等待2秒的水位线
在这里插入图片描述
另外需要注意的是,这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。也就是说,上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。

2.2.3 水位线的特性

  • 水位线是插入到数据流中的一种标记,可以认为是一种特殊的数据
  • 水位线主要内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t) 表示在当前流中事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’<= t的数据

水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

2.3 如何生成水位线

水位线是保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟时间,但是在实际应用中不是随便调整的。

2.3.1 生成水位线的总体原则

只能尽量保证水位线的正确,如果想对结果正确性要求很高,那就需要设置等待时间长一点,但是等待的时间越长,漏掉数据的概率越低。但是这样做的代价就是处理的实时性降低了,所以我们希望能处理的更快,更实时,就必须将水位线设置的低一些。而对于漏掉的数据,Flink提供了窗口处理迟到数据。

Flink中的水位线其实就是流处理中对延迟和结果正确性的一个权衡机制,而且把控制权交给了程序员,我们可以在代码中定义水位线的生成策略。

2.3.2 水位线的生成策略(Watermark Strategies)

FlinkDataStream API中,提供了用于生成水位线的方法.assignTimestampsAndWatermarks(),主要是用来为流中的数据分配时间戳,并生成水位线来指示事件时间,

// 为数据流中的元素分配时间戳并生成水位线以表示事件时间进度。给定的 WatermarkStrategy 用于创建 TimestampAssigner 和 WatermarkGenerator。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

具体使用时直接用DataSream调用该方法即可,与普通transform方法完全一样

DataStream<String> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.STRING_TYPE_INFO);
        DataStream<String> withTimestampsAndWatermarks = stringDataStreamSource.assignTimestampsAndWatermarks(<watermark strategy>);

.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这个参数就是水位线生成策略WatermarkStrategy中包含了一个时间戳分配器(TimestampAssigner)和一个水位线生成器(WatermarkGenerator)

@Public
public interface WatermarkStrategy<T>
        extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
@Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
@Override
    default TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        return new RecordTimestampAssigner<>();
    }
}
  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator接口中,主要又有两个方法:onEvent()onPeriodicEmit()
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件,时间戳以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作。
  • onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms
@Public
public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}

也可以通过设置周期时间
env.getConfig().setAutoWatermarkInterval(60 * 1000L);

2.3.3 Flink内置水位线生成器

Flink提供了两种内置的水位线生成器(WatermarkGenerator),分别对应着有序流和乱序流的场景。

2.3.3.1 有序流

对于有序流主要特点就是时间戳单调增长,所以永远不会出现迟到的数据问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStategy.forMonotonousTimestamps()方法就可以实现,简单说就是拿当前最大时间戳作为水位线就可以了。

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

源码可以看到有序流返回的水位线生成器策略是AscendingTimestampsWatermarks对象:
示例代码

ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));

        DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        // 使用forMonotonousTimestamps
        DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

上面的代码中我们调用的.withTimestampAssigner()方法,将数据中的timestamp字段提取出来作为时间戳分配给数据元素;然后使用内置有序流水位线生成器构造出了生成策略,这样提取出来的数据时间戳就是我们处理计算时间的事件时间

要注意的是这里的时间戳和水位线的单位必须都是毫秒。

2.3.3.2 乱序流

由于乱序流中需要等待迟到的数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成的水位线的时间戳就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }

不过这个方法需要传一个maxOutOfOrderness参数,表示最大乱序程度, 它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,呢么设置对应时间长度的延迟就可以等到所有的乱序数据了。
示例代码

DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

上面的代码提取了timestamp字段作为时间戳,并且以5毫秒的延迟时间创建了处理乱序流的水位线生成器。

查看有序流的水位线生成器和乱序流的水位线生成器的源码可以发现,其实有序流的水位线生成器本质上和乱序流是一样的,相当于延迟为0秒的乱序流水位线生成器。

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    private long maxTimestamp;
    private final long outOfOrdernessMillis;
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

这里还要注意一下,乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳 - 延迟时间 - 1,这里的单位是毫秒。为什么要减去1毫秒是以为Flink中的窗口都是左开右闭的(8,7] 表示7点到8点的数据。

2.3.4 自定义水位线策略

WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;不同的点在于WatermarkGenerator的实现,整体来说,有两种生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)

创建自定义水位线离不开WatermarkGeneratoronEvent() onPeriodicEmit(),前者是每个事件到来时调用,后者由框架周期性的调用。周期性调用的方法中onPeriodicEmit()发出水位线就是周期性生成水位线,由事件触发的 方法中onEvent()发出的水位线就是断点式生成的水位线。两种方式的不同就集中体现在这两个方法上。

2.3.4.1 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
示例代码:

public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 从集合中读取数据
        ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));

        DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
            @Override
            public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                };
            }

            @Override
            public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Event>() {
                    private Long delayTime = 5000L; // 延迟时间

                    private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
                    @Override
                    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
                        // 每来一条数据就调用一次
                        maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        // 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间
                        output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
                    }
                };
            }
        });
    }

2.3.4.2 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停的检测onEvent()事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。
示例代码

public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 从集合中读取数据
        ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));

        DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
            @Override
            public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                };
            }

            @Override
            public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Event>() {
                    @Override
                    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
                        // 只有在遇到特定的 itemId 时,才发出水位线
                        if (event.getUser().equals("Mary")) {
                            output.emitWatermark(new Watermark(event.getTimestamp() - 1));
                        }
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
                    }
                };
            }
        });
    }

我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

2.3.5 在自定义数据源中发送水位线

在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
示例代码

env.addSource(new SourceFunction<Event>() {
            private boolean running = true;
            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                Random random = new Random();
                String[] userArr = {"Mary", "Bob", "Alice"};
                String[] urlArr = {"./home", "./cart", "./prod?id=1"};
                while (running) {
                    long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                    String username = userArr[random.nextInt(userArr.length)];
                    String url = urlArr[random.nextInt(urlArr.length)];
                    Event event = new Event(username, url, currTs);
                    // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                    ctx.collectWithTimestamp(event, event.timestamp);
                    // 发送水位线
                    ctx.emitWatermark(new Watermark(event.timestamp - 1L));
                    Thread.sleep(1000L);
                }
            }
            @Override
            public void cancel() {
                this.running = false;
            }
        });

ctx.collectWithTimestamp(event, event.timestamp); 这里发出的时间戳,其实就是我们经常使用的内置水位线生成器的方法中的recordTimestamp

return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                };

在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序,测试 Flink 的各种各样的特性。

2.4 水位线的传递

在每一个分区中都有一个分区水位线。
在这里插入图片描述
如图 6-12 所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:

  1. 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
  2. 当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
  3. 再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
  4. 同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。

2.5 水位线的总结

水位线在事件时间的世界里面,承担了时钟的角色。也就是说在事件时间的流中,水位线是唯一的时间尺度。如果想要知道现在几点,就要看水位线的大小。后面讲到的窗口的闭合,以及定时器的触发都要通过判断水位线的大小来决定是否触发。

水位线是一种特殊的事件,由程序员通过编程插入的数据流里面,然后跟随数据流向下游流动。水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。
所以这里涉及到一个问题,就是不同的算子看到的水位线的大小可能是不一样的。因为下游的算子可能并未接收到来自上游算子的水位线,导致下游算子的时钟要落后于上游算子的时钟。比如 map->reduce 这样的操作,如果在 map 中编写了非常耗时间的代码,将会阻塞水位线的向下传播,因为水位线也是数据流中的一个事件,位于水位线前面的数据如果没有处理完毕,那么水位线不可能弯道超车绕过前面的数据向下游传播,也就是说会被前面的数据阻塞。这样就会影响到下游算子的聚合计算,因为下游算子中无论由窗口聚合还是定时器的操作,都需要水位线才能触发执行。这也就告诉了我们,在编写 Flink 程序时,一定要谨慎的编写每一个算子的计算逻辑,尽量避免大量计算或者是大量的 IO 操作,这样才不会阻塞水位线的向下传递。

在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。

对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。

水位线的重要性在于它的逻辑时钟特性,而逻辑时钟这个概念可以说是分布式系统里面最为重要的概念之一了,理解透彻了对理解各种分布式系统非常有帮助。


http://lihuaxi.xjx100.cn/news/1280199.html

相关文章

StarRocks案例7:使用shell批量broker load导入hdfs数据

文章目录 一. 问题描述二. 解决方案 一. 问题描述 近期需要进行补录数据&#xff0c;需要将hive的历史数据迁移到StarRocks&#xff0c;因为需要补录的数据较多&#xff0c;hive和StarRocks均使用的是分区表&#xff0c;两边的日期格式也不同&#xff0c;hive这边是 yyyymmdd格…

arcgis api的popup.open鼠标左键不管用,右键才有效问题解决

场景&#xff1a;用arcgis api for js 打开view的弹窗事件&#xff0c;用View.popup.open(&#xff09;方法可以打卡&#xff0c;但是操作的时候发现左键点击地图时候没反应&#xff0c;右键才可以。后来查了api&#xff0c;需要进行如下设置&#xff0c;让左右中间滚轮都有反…

一文教你Mysql如何性能优化

Mysql性能优化 Mysql性能优化 性能优化维度 数据库优化思路 应急调优的思路&#xff1a; 针对突然的业务办理卡顿&#xff0c;无法进行正常的业务处理&#xff01;需要立马解决的场景&#xff01; show processlist&#xff08;查看连接session状态&#xff09; explain(分…

uniapp导航栏点击切换特效 下边框跟随动态变化

前言 本人几率代码方便以后使用&#xff0c;导航栏下边框跟随特效 <template><view class"video"><view class"topbar"><scroll-view scroll-x"true"class"scroll-view"><view class"rel">…

Three.js教程:gui.js库(可视化改变三维场景)

推荐&#xff1a;将 NSDT场景编辑器 加入你的3D工具链 其他系列工具&#xff1a; NSDT简石数字孪生 gui.js库(可视化改变三维场景) gui.js库(可视化改变三维场景) dat.gui.js说白了就是一个前端js库&#xff0c;对HTML、CSS和JavaScript进行了封装&#xff0c;学习开发的时候…

【vue2】封装文字过长自动省略部分并且鼠标悬浮显示全部

技术&#xff1a;Ant design vue1.7.8 UI框架、vue2.X 需求:实现文字过长自动省略部分&#xff0c;鼠标悬浮显示全部 效果图&#xff1a; 图一&#xff1a; 图二&#xff1a; 1.封装组件代码&#xff1a; src/components/Ellipsis/index.js 文件下代码 import Ellipsis f…

dsl语句查询elasticsearch集群节点分布和资源使用情况

查询语句如下&#xff08;本文是直接在kibana里面执行的哦&#xff09; GET _cat/nodes?v执行结果 这样就可以很直观的看到&#xff0c;es部署在了哪些节点上&#xff0c;以及各节点资源分布使用

【PostgreSQL-16新特性之普通用户的保留连接个数(reserved_connections)】

PostgreSQL数据库为了保证在高并发&#xff0c;高连接数情况下某些用户能够正常连接到数据库里&#xff0c;设立了几个用户连接的保留个数。 本文介绍了PostgreSQL16版本前为超级用户保留的连接数&#xff08;superuser_reserved_connections&#xff09;以及PostgreSQL16版本…