[Flink]wordcount

news/2024/7/5 3:32:17

一、有界流

1、代码

package wc;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        //TODO 1.创建流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //TODO 2.读取文件
        DataStreamSource<String> lineDS = env.readTextFile("input/words.txt");
        //TODO 3.处理数据:切分、转换、分组、求和
        //flatMap方法的参数是一个接口,该接口需要重写flatMap方法
        //这里使用的是匿名实现类
        //value为读入的每条数据的,数据类型
        //out为采集器,用来返回数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1); //将每个单词转换成2元组
                    out.collect(wordAndOne);//使用Collector向下游发送数据
                }
            }
        });

        //TODO 4.按照word分组
        //new KeySelector<Tuple2<String, Integer>, String> 第一个类型指的是传入的数据的类型,第二个类型指的是key的数据类型
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //TODO 5.聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);

        //TODO 6.打印
        sumDS.print();

        //TODO 7.执行
        env.execute(); //默认核数为电脑的所有核数

    }
}

2、说明

假如接口A,里面有一个方法a()
1)正常写法:定义一个class B,去实现接口A,并且实现它的方法a()
B b=new B()
2)匿名实现类写法

new A(){
  实现a(){ }
}

二、无界流

1、代码

package wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //TODO 1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2.读取数据:socket
        DataStreamSource<String> lineDataStream = env.socketTextStream("hadoop1",7777);

        //TODO 3.处理数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDataStream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = value.split("\\s+");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }})
                .returns(Types.TUPLE(Types.STRING, Types.INT)) //存在泛型擦除的问题,需要指定flatmap
                .keyBy((value) -> value.f0)
                .sum(1);       //value:只有一个参数的时候,类型可以不写

        //TODO 4.打印
        sum.print();

        //TODO 5.启动执行
        env.execute(); //默认核数为电脑的所有核数
    }
}

2、在hadoop上启动

nc -lk 7777

3、报错

 1)报错原因:泛型擦除

没有指定Collector的类型

2)解决方法:增加returns方法,指定Collector的类型


http://lihuaxi.xjx100.cn/news/1288470.html

相关文章

44 # 流的原理

通过 fs 模块实现拷贝功能 同步的拷贝实现 新建 name.txt 文件&#xff0c;里面添加 凯小默的博客const fs require("fs"); const path require("path"); // 读取默认不指定编码都是 buffer 类型 let r fs.readFileSync(path.resolve(__dirname, &qu…

【什么是iMessage苹果推】怎样来获取设备令牌(Device Token)实现步骤

要获取设备令牌&#xff08;Device Token&#xff09;&#xff0c;您需要在应用程序中实现以下步骤&#xff1a; 在应用程序中请求用户授权&#xff1a;您需要请求用户授权允许应用程序发送远程通知。这可以通过使用 UNUserNotificationCenter&#xff08;User Notifications …

Python 按照shp行政边界切割tif的两种方式

介绍两种按照shp行政边界切割tif文件的方式 1. gdal.Warp from osgeo import gdalshp_path"shp文件路径" path"tif数据路径"#读取shp shp_dataset gdal.OpenEx(shp_path, gdal.OF_VECTOR) shp_layer shp_dataset.GetLayer() shp_srs shp_layer.GetSpat…

态路小课堂丨关于单纤双向CWDM波分复用的简介

TARLUZ态路 随着数据中心规模的增长和传输距离的增加&#xff0c;单模光纤传输技术被引入数据中心&#xff0c;同时被引入的还有WDM技术。由于数据中心对其成本较为敏感&#xff0c;因此&#xff0c;CWDM传输技术成为了其更好的选择。在CWDM系统中&#xff0c;主要有双纤单向CW…

linux-2.6.22.6内核总线设备驱动模型

开发一个驱动程序时&#xff0c;不免涉及到对硬件的相关操作&#xff0c;例如读取寄存器和引脚&#xff0c;在不利用任何框架的基础上&#xff0c;硬件代码总是和其他操作耦合到一块&#xff0c;这样做的坏处是代码耦合性太强&#xff0c;例如有三盏led灯&#xff0c;驱动程序每…

Python中的for循环语句及其应用举例(等差数列求和、阶乘、寻找最大值)

Python中的for循环语句及其应用举例(等差数列求和、阶乘、寻找最大值) 在学习任何编程语言的时候&#xff0c;不熟悉判断选择结构和循环结构&#xff0c;就难以发挥计算机优秀的计算能力和提高学习工作效率。本文将重点讲解Python中的for循环语句&#xff0c;并举例等差数列求…

easyConnect 报本地环境异常错误

一、检查任务管理器中发现ecagent.exe进程是禁用状态。如图&#xff1a; 二、在异常客户端上&#xff0c;找到easyconnect的安装目录&#xff08;默认路径&#xff1a;C:\Program Files (x86)\Sangfor\SSL\ECAgent&#xff09;&#xff0c;找到ecagent.exe应用程序尝试手动执行…

H3C交换机在地址池下如何进行IP和MAC地址绑定

环境&#xff1a; H3C S6520-26Q-SI version 7.1.070, Release 6326 问题描述&#xff1a; H3C交换机在地址池下如何进行IP和MAC地址绑定 将MAC地址为0000-e03f-0305的PC机与IP地址10.1.1.1绑定&#xff0c;掩码为255.255.255.0 解决方案&#xff1a; 1.进入地址池视图 …