Flink的常用算子以及实例

news/2024/7/5 2:13:25

1.map

特性:接收一个数据,经过处理之后,就返回一个数据
在这里插入图片描述

1.1. 源码分析

  • 我们来看看map的源码
    在这里插入图片描述
    map需要接收一个MapFunction<T,R>的对象,其中泛型T表示传入的数据类型,R表示经过处理之后输出的数据类型
  • 我们继续往下点,看看MapFunction<T,R>的源码
    在这里插入图片描述
    这是一个接口,那么在代码中,我们就需要实现这个接口

1.2. 案例

那么我们现在要实现一个功能,就是从给一个文件中读取数据,返回每一行的字符串长度。

我们要读取的文件内容如下
在这里插入图片描述

代码贴在这里(为了让打击不看迷糊,导包什么的我就省略了)

public class TransformTest1_Base {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 将并行度设为1
        env.setParallelism(1);
        // 3. 读取文件夹
        DataStreamSource<String> inputDataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 4. 将文件夹每一行的数据都返回它的长度
        // 在这里我们用匿名内部类的方式创建了一个MapFunction对象
        SingleOutputStreamOperator<Integer> dataStream = inputDataStream.map(new MapFunction<String, Integer>() {

            // 5. 重写map方法,参数s是接收到的一个数据,我们只需要返回它的长度就行了。
            @Override
            public Integer map(String s) throws Exception {
                return s.length();
            }
        });
        // 6. 打印输出
        dataStream.print();
        // 7. 启动执行环境
        env.execute();
    }
}

显示
在这里插入图片描述

1.3. 总结

map的使用范围就是需要对的那个数据进行处理,并且每次返回一个数据的时候,map就比较方便了。

2. flatMap

  • 接收一个数据,可以返回多条数据

2.1. 源码分析

在这里插入图片描述
我们发现,它需要传入一个FlatMapFunction的一个对象
在这里插入图片描述

我们继续点进去,看看FlatMapFunction的源码,可以发现,FlatMapFunction<T,R>也是一个接口,并且接口里面的方法的返回值是一个Collector,也就是多个值的集合。

2.2. 案例

我们还是读取那个文件,这次我们要做的处理是,将文件的每一行数据按照逗号隔开,给出代码:

public class TransformTest2_Base {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 设置并行度
        env.setParallelism(1);
        // 3. 读取文件夹
        DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 4. 用匿名内部类的方式重写FlatMapFuncction,将每行字符按","隔开
        SingleOutputStreamOperator<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                // 5. 分割一行字符,获得对应的字符串数组
                String[] split = s.split(",");
                for (String slt : split) {
                    // 6. 将这些数据返回
                    collector.collect(slt);
                }
            }
        });
        // 7. 打印输出处理后的数据
        flatMapStream.print();
        // 8. 启动执行环境
        env.execute();
    }
}

可以看到执行的结果
在这里插入图片描述

3. filter

听这个名字就知道是个过滤器,用来过滤数据。
在这里插入图片描述

3.1. 源码分析

我们看看filer的源码,继承子FilterFunction,可以看到,这次泛型就只有一个值了,因为filter只允许返回的数据<=原来的数据,所以只做过滤,并不能改变数据蕾西,没必要设置返回的类型
在这里插入图片描述
我们继续点进去,看看FilterFunction的源码
在这里插入图片描述
果不其然,也是一个接口,而里面的filter方法只有一个参数,并且返回的是一个boolean类型,若返回true则var1原样返回,若返回false,则var1会被过滤掉。

3.2. 案例

我们还是读取以上文件,这一次我们返回以"sensor_1"开头的字符串,其余的一律不返回,给出代码

public class TransformTest3_Base {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 设置并行度
        env.setParallelism(1);
        // 3. 读取文件
        DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 4. 用匿名内部类的方式重写FilterFunction
        SingleOutputStreamOperator<String> filterDataStream = dataStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                // 5. 若s以"sensor_1"开头,则返回true
                return s.startsWith("\"sensor_1\"");
            }
        });
        // 6. 打印处理后的数据
        filterDataStream.print();
        // 7. 启动执行环境
        env.execute();
    }
}

4. 分组聚合

  • 注意:任何的聚合操作都有默认的分组,聚合是在分组的基础上进行的。比如,对整体进行求和,那么分组就是整体。所以,在做聚合操作之前,一定要明确是在哪个分组上进行聚合操作
  • 注意:聚合操作,本质上是一个多对一(一对一是多对一的特殊情况)的操作。特别注意的是这个’一‘,可以是一个值(mean, sum等),同样也可以是一个对象(list, set等对象)

4.1. 分组(keyBy)

在这里插入图片描述
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

  • 分组就是为了聚合操作做准备的,keyBy方法会将数据流按照hash实现,分别放在不同的分区,每个分区都可以进行聚合操作。
  • 我们可以用这个性质,计算每一个sensor温度的最大值,我们为此将文件修改:
    在这里插入图片描述
    分组之后的图就是所有sensor_1在一个分区里,sensor_6,sensor_7,sensor_10在不同的三个分区,也就是有四个分区,而后三个分区中只有一条数据,所以最大值和最小值都只有一个
  • 在flink中,分组操作是由keyBy方法来完成的,我们来看看keyBy的源码
    在这里插入图片描述
    可以发现,keyBy可以对对象和元组进行聚合。

4.2. 聚合

这些算子可以针对 KeyedStream 的每一个支流做聚合。
⚫ sum():对每个支流求和
⚫ min():对每个支流求最小值
⚫ max():对每个支流求最大值
⚫ minBy()
⚫ maxBy()
我们来看看max()的源码
在这里插入图片描述
这也是传一个属性名,也就是求对应的属性名的最大值。

4.3. 实例演示

public class TransformTest1_RollingAggreation {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 设置并行度
        env.setParallelism(1);
        // 3. 读取文件
        DataStreamSource<String> stringDataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 4. 用map将每行数据变成一个对象
        SingleOutputStreamOperator<SensorReading> map = stringDataStreamSource.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
            }
        });
        // 5. 分组操作,以id属性分组
        KeyedStream<SensorReading, Tuple> keyedstream = map.keyBy("id");
        // 6. 聚合操作,求每个分组的温度最大值
        SingleOutputStreamOperator<SensorReading> resultStream = keyedstream.max("temperature");
        // 7. 打印输出
        resultStream.print();
        // 8. 启动执行环境
        env.execute();
    }
}

运行结果
在这里插入图片描述
诶,这有人就要问了,不是求每一个分组的温度最大值么?为什么sensor_1的这个分组所有的数据都有?
答:flink是一个流处理分布式框架,这是一条数据流,每来一个数据就得处理一次,所以输出的都是当前状态下的最大值。

4.4. reduce自定义聚合

在实际生产中,不可能让我们完成这么简单的操作就行了,所以我们需要更复杂的操作,而reduce就是满足这个条件,它可以让我们自定义聚合的方式。

  • 我们来看看reduce的源码
    在这里插入图片描述
    reduce需要传入的是一个ReduceFunction的对象,我们再来看看ReduceFunction是个什么东西
    在这里插入图片描述
    var1是当前这个分组的状态,var2是新加入的值,而reduce函数体就是我们要进行的操作,返回一个新的状态。
    到这我就明白了,要是我们向实时获取最大温度的话,var1是之前的最大温度,通过var1和var2的比较就能实现。

4.5. reduce实例

我们这一次要实现一个实时的温度最大值,也就是返回的数据中的时间戳是当前的。

public class TransformTest1_Reduce {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 设置并行度
        env.setParallelism(1);
        // 3. 读取文件
        DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 4. 通过map将每行数据转换为一个对象
        SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
            }
        });
        // 5. 按对象的id分组
        KeyedStream<SensorReading, Tuple> keyStream = map.keyBy("id");
        // 6. reduce自定义聚合
        SingleOutputStreamOperator<SensorReading> reduce = keyStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
                // 7. 获取当前时间为止接收到的最大温度
                return new SensorReading(sensorReading.getId(), System.currentTimeMillis(), Math.max(sensorReading.getTemperature(),t1.getTemperature()));
            }
        });
        // 8. 打印输出
        reduce.print();
        // 9. 启动运行环境
        env.execute();
    }
}

这一次的输出我们就得你好好研究一下了。
在这里插入图片描述
从这块可以发现,我们获取的都是当前的时间戳,而且时间戳也在改变,这一点很好理解,但是下面这个数据就很诡异了。
在这里插入图片描述

  • 这两块的时间戳为什么没有改变呢?这需要我们再来看看reduce方法了,reduce方法是传入两个参数,第一个是当前的状态,第二个是新读取的值,通过方法体的操作返回一个最新的状态。
  • 仔细理解一下这句话,若我刚开始没有数据的时候,那么哪来的状态呢?所以reduce把接收到的第一个参数作为状态,其中sensor_6,7,8这三个分区只有一个数据,所以直接拿来当作状态。

5. 多流转换算子

5.1. 分流操作(Split 和 Select)

  • Split能将流中的数据按条件贴上标签,比如我把温度大于30度的对象贴上一个high标签,把温度低于30度的贴上一个low标签,标签可以贴多个。那么就把流中的数据,按照标签分类了(这里并没有分流)
    在这里插入图片描述
  • Select是按照标签来分流
    在这里插入图片描述
  1. split源码
    在这里插入图片描述
    可以发现,返回的是一个SplitStream,需要传入一个选择器,我们看看OutputSeclector的源码
    在这里插入图片描述
    传入value,返回这个value对应的标签,实现对这个value进行类似"分类"的操作。
  2. select源码
    在这里插入图片描述
    只需要接收一个或者多个标签就能返回包含那个标签对象的数据流。

5.2. 实例演示

  • 我们这一次要把读取到的数据分成三条流,一条是high(高于30度),一条是low(低于30度),一条是all(所有的数据)。代码:
public class TransformTest4_MultipleStreams {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 设置并行度
        env.setParallelism(1);
        // 3. 读取文件
        DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 4. 通过map将每行数据转换为一个对象
        SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
            }
        });
        // 5. 按条件贴标签
        SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        // 6. 按标签选择,生成不同的数据流
        DataStream<SensorReading> high = split.select("high");
        DataStream<SensorReading> low = split.select("low");
        DataStream<SensorReading> all = split.select("high", "low");
        high.print("high");
        low.print("low");
        all.print("all");
        env.execute();
    }
}

5.3. 合流操作Connect 和 CoMap

在这里插入图片描述
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数
据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
在这里插入图片描述
ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。
类似于一国两制,看似两条流合并在了一起,其实内部依旧是按照自己的约定运行,类型并没有改变。

  1. connect源码
    在这里插入图片描述
    将当前调用者的流和参数中的流合并,返回一个ConnectedStreams<T,R>类型
    在这里插入图片描述
    我们再来看看ConnectionStreams<T,R>中的map方法,其中要传的是一个CoMapFunction<IN1,IN2,R>的对象,最重要的就是这个类,我们来看看这个类
    在这里插入图片描述
    这个CoMapFunction<IN1,IN2,R>和之前的MapFunction不太一样,这里要重写的方法有两个,map1和map2,一个是针对IN1的,一个是针对IN2的,R就是返回类型。
    这下全明白了,在这个方法内部,对这两条流分别操作,合成一条流。

5.4. 实例演示

public class TransformTest5_MultipleStreams {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 1. 读取文件
        DataStreamSource<String> dataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");
        // 2. 将每行数据变成一个对象
        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
            }
        });
        // 3. 将数据打上标签
        SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        // 4. 按照高温和低温的标签分成两条流
        DataStream<SensorReading> high = split.select("high");
        DataStream<SensorReading> low = split.select("low");
        // 5. 将high流的数据转换为二元组
        SingleOutputStreamOperator<Tuple2<String, Double>> tuple2SingleOutputStreamOperator = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading sensorReading) throws Exception {
                return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature());
            }
        });
        // 6. 将tuple2SingleOutputStreamOperator和low连接
        ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = tuple2SingleOutputStreamOperator.connect(low);
        // 7. 调用map传参CoMapFunction将两条流合并成一条流objectSingleOutputStreamOperator
        SingleOutputStreamOperator<Object> objectSingleOutputStreamOperator = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            // 这是处理high流的方法
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.getField(0), value.getField(1), "temp is too high");
            }
            // 这是处理low流的方法
            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getTemperature(), "normal");
            }
        });
        objectSingleOutputStreamOperator.print();
        env.execute();
    }
}

5.5. 多条流合并(union)

之前我们只能合并两条流,那我们要合并多条流呢?这里我们就需要用到union方法。
在这里插入图片描述

  • Connect 与 Union 区别:
  1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
  2. Connect 只能操作两个流,Union 可以操作多个。

若我们给出以下代码:

high.union(low,all);

那么high,low,all三条流都会合并在一起。


http://lihuaxi.xjx100.cn/news/1451677.html

相关文章

用AI攻克“智能文字识别创新赛题”,这场大学生竞赛掀起了什么风潮?

文章目录 一、前言1.1 大赛介绍1.2 项目背景 二、基于智能文字场景个人财务管理创新应用2.1 作品方向2.2 票据识别模型2.2.1 文本卷积神经网络TextCNN2.2.2 Bert 预训练微调2.2.3 模型对比2.2.4 效果展示 2.3 票据文字识别接口 三、未来展望 一、前言 1.1 大赛介绍 中国大学生…

leetcode-413. 等差数列划分(java)

等差数列划分 leetcode-413. 等差数列划分题目描述双指针 上期经典算法 leetcode-413. 等差数列划分 难度 - 中等 原题链接 - 等差数列划分 题目描述 如果一个数列 至少有三个元素 &#xff0c;并且任意两个相邻元素之差相同&#xff0c;则称该数列为等差数列。 例如&#xff0…

TiDB数据库的安装配置

一、 TiDB 软件和硬件环境建议配置 Linux 操作系统版本要求 Linux 操作系统 版本 Red Hat Enterprise Linux 7.3 及以上的 7.x 版本 CentOS 7.3 及以上的 7.x 版本 Oracle Enterprise Linux 7.3 及以上的 7.x 版本 Amazon Linux 2 Ubuntu LTS 16.04 及以上的版本 …

UE4/UE5 照明构建失败 “Lightmass crashed”解决“数组索引越界”

在构建全局光照时,经常会出现“Lightmass crashed”的错误,导致光照构建失败。本文将分析这一问题的原因,并给出解决建议。 UE4 版本4.26 报错如下: <None> === Lightmass crashed: === Assertion failed: (Index >= 0) & (Index < ArrayNum) [File:d:\build…

Python入门--变量和数据类型

什么是变量&#xff1f; 在编程中&#xff0c;变量是指内存中的一段存储空间&#xff0c;用于存储数据。使用变量可以方便地存储数据并在程序中进行操作。 如何定义变量&#xff1f; 在Python中&#xff0c;可以使用“”符号来定义变量&#xff0c;例如&#xff1a; a 1 b …

如何选择专业的化妆品专柜神秘顾客公司(北京神秘顾客)

选择专业的化妆品专柜第三方神秘顾客公司需要仔细考虑&#xff0c;以确保您选择与之合作的公司能够提供有价值的见解和高质量的服务。以下是选择化妆品专柜专业神秘顾客公司时应考虑的关键因素&#xff1a; 1、声誉和经验&#xff1a;选择在行业内声誉良好且经验丰富的商超神秘…

vector【2】模拟实现(超详解哦)

vector 引言&#xff08;实现概述&#xff09;接口实现详解默认成员函数构造函数析构函数赋值重载 迭代器容量size与capacityreserveresizeempty 元素访问数据修改inserterasepush_back与pop_backswap 模拟实现源码概览总结 引言&#xff08;实现概述&#xff09; 在前面&…

Smartbi 修改用户密码漏洞

漏洞简介 通过查看 Smartbi 的补丁包信息&#xff0c;发现存在漏洞在某种特定情况下修改用户的密码&#xff0c;进行简单的复现和分析 漏洞复现 在页面上修改密码时&#xff0c;需要知道原本的用户对应的密码 直接构造这样的数据包&#xff0c;就不需要知道原本的密码&#x…