hadoop2 自定义OutputFormat场景杂记

news/2024/7/2 23:39:58

为什么80%的码农都做不了架构师?>>>   hot3.png

提示:以下代码都是在 Hadoop2.7.x 最新API下进行。

场景1:自定义输出文件名前缀

示例:计算学生的平均分成绩,输出:学生姓名和平均分成绩;要求:根据成绩的范围(0~59, 60~70, 70~80, 80~90, 90~100)输出到不同的文件中,文件名前缀为:student_score_059、student_score_6070、student_score_7080、student_score_8090、student_score_90100。

方法1(最简单,推荐):Override MultipleTextOutputFormatgenerateFileNameForKeyValue()方法。

// 重载 MultipleTextOutputFormat 的 generateFileNameForKeyValue()方法来实现
public class PartitionScoreOutputFormat extends MultipleTextOutputFormat<Text, IntWritable>
{private static final String PREFIX = "student_score_";@Overrideprotected String generateFileNameForKeyValue(Text key, IntWritable value, String name){int score = value.get();if(score < 60) {return PREFIX + "059";}if(score < 70) {return PREFIX + "6070";}if(score < 80) {return PREFIX + "7080";}if(score < 90) {return PREFIX + "8090";}return PREFIX + "90100";}
}// 调用
job.setOutputFormat(PartitionScoreOutputFormat.class)

方法2:使用 MultipleOutputs.addNamedOutput() 方法

public class StudentScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{// 使用 MultipleOutputsprivate MultipleOutputs<Text, IntWritable> mos;@Overrideprotected void setup(Context context) throws ...{super.setup(context);mos = new MultipleOutputs<Text,IntWritable>(context);}@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws ...{int totalScore = 0;int count = 0;for(IntWritable score : values) {totalScore += score.get();count++;}int avgScore = count > 0 ? totalScore/count : 0;// 使用 named output,对应在 Task 中的 MultipleOutputs.addNamedOutput(...) 定义mos.write(getNamed(avgScore), key, new IntWritable(avgScore));}@Overrideprotected void cleanup(...) {// ...mos.close();}private static String getNamed(int score) {if(score < 60) {return "score059";}if(score < 70) {return "score6070";}if(score < 80) {return "score7080";}if(score < 90) {return "score8090";}return "score90100";}}
public class StudentScoreTask
{public static void main(String[] args) throws ...{Job job = Job.getInstance(new Configuration());job.setJobName("...");// ...// 重要:定义命名输出规则: // 第二个参数:score059 等名字要和 Reducer中的 MultipleOutputs.write(namedParam, ...) 命名一致MultipleOutputs.addNamedOutput(job, "score059", TextOutputFormat.class, Text.class, IntWritable.class);MultipleOutputs.addNamedOutput(job, "score6070", ...);MultipleOutputs.addNamedOutput(job, "score7080", ...);MultipleOutputs.addNamedOutput(job, "score8090", ...);MultipleOutputs.addNamedOutput(job, "score90100", ...);// ...System.exit(job.waitForCompletion(true) ? 0 : 1);}}

场景2:完全自定义输出的文件名和后缀

示例:计算学生的平均分成绩,将学生姓名和成绩输出到文件中,文件名格式为:student_score_${yyyyMMdd}_${taskId}.txt(student_score_20160930_0.txt)

方法:由于是输出到文本文件,因此 Override TextOutputFormat 的 getDefaultWorkFile() 方法即可:

public class StudentScoreOutputFormat<K,V> extends TextOutputFormcat<K,V>
{@Overridepublic Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException{FileOutputCommitter comitter = (FileOutputCommitter) super.getOutputCommitter(context);return new Path(committer.getWorkPath(), generateFileName(context));}public synchronized static String generateFileName(TaskAttemptContext context){TaskID taskId = context.getTaskAttemptID().getTaskID();int partition = taskId.getId();String currentDate = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime());return String.format("student_score_%s_%d.txt", currentDate, partition);}
}// 使用方式
Job job = Job.getInstance(...);
//...// 将输出文件格式化类指定为自定义的 StudentScoreOutputFormat 即可
job.setOutputFormatClass(StudentScoreOutputFormat.class);// 如果不想生成空文件 part-r-xxxx 等,使用 LazyOutputFormat 设置替代上面的设置即可
// LazyOutputFormat.setOutputFormatClass(job, StudentScoreOutputFormat.class);

场景3:修改默认一个Task一个文件输出,将所有输出合并到一个文件中,该文件采用固定size进行分割为多个文件(类似日志文件输出,比如当文件size达到1G时,自动生成第二个文件,后续输出到第二个文件中,以此类推)。

 

================== 未完待续,后面会持续补充遇到的特殊文件输出要求,更欢迎大家提供~~~

 

 

转载于:https://my.oschina.net/jsan/blog/752519


http://lihuaxi.xjx100.cn/news/283896.html

相关文章

Mysql使用存储过程快速添加百万数据

这篇文章主要介绍了Mysql使用存储过程快速添加百万数据,本文通过实例代码给大家介绍的非常详细&#xff0c;对大家的学习或工作具有一定的参考借鉴价值&#xff0c;需要的朋友可以参考下 前言 为了体现不加索引和添加索引的区别&#xff0c;需要使用百万级的数据&#xff0c;但…

程序员拯救乐坛?OpenAI用“逆天”GPT2.0搞了个AI音乐生成器

作者 | 琥珀出品 | AI科技大本营&#xff08;id&#xff1a;rgznai100&#xff09;基于深度神经网络进行语音合成、音乐风格迁移&#xff0c;正成为不少致力于“让人人成为音乐家”的研究人员所追求的事情。像此前我们报道的微软小冰作词又作曲&#xff0c;AI帮清华博士写说唱歌…

【MySQL】MySQL的索引

目录索引1.1 概念1.2 作用1.3 使用场景1.4 使用1.5 索引最常用的数据结构索引 1.1 概念 索引是一种特殊的文件&#xff0c;包含着对数据表里所有记录的引用指针。可以对表中的一列或多列创建索引&#xff0c; 并指定索引的类型&#xff0c;各类索引有各自的数据结构实现。 1…

钉钉被小学生逼疯,拍片在线求饶哈哈哈哈

这已经是钉钉一星事件的第三次转折了。哈哈哈哈哈哈哈哈哈哈为什么哈哈哈哈哈或或或或或或或或或或或哈哈哈哈哈哈哈哈哈哈为什么哈哈哈真的一次比一次好笑我要是钉钉我真的欲哭无泪&#xff0c;谁能想到我一个这么强大的APP有一天被小学生逼得启动了危机公关。帮不了解全过程的…

iOS单独集成QQ分享功能

2019独角兽企业重金招聘Python工程师标准>>> &#xff08;1&#xff09;首先&#xff0c;把TencentOpenAPI.framework、TencentOpenApi_IOS_Bundle.bundle导入工程中。 &#xff08;ps&#xff0c;我在搜索资料时&#xff0c;看到有人说TencentOpenAPI.framework要放…

如何用python实现邮箱发送信息

这篇文章主要为大家详细介绍了python实现邮箱发送信息&#xff0c;文中示例代码介绍的非常详细&#xff0c;具有一定的参考价值&#xff0c;感兴趣的小伙伴们可以参考一下 本文实例为大家分享了python实现邮箱发送信息的具体代码&#xff0c;供大家参考&#xff0c;具体内容如下…

清晰易懂的Focal Loss原理解释

来源&#xff1a;https://www.cnblogs.com/king-lps/p/9497836.html编辑&#xff1a;石头1. 总述Focal loss主要是为了解决one-stage目标检测中正负样本比例严重失衡的问题。该损失函数降低了大量简单负样本在训练中所占的权重&#xff0c;也可理解为一种困难样本挖掘。2. 损失…

pytest接口测试之fixture传参数request

本文主要介绍了pytest接口测试之fixture传参数request的使用&#xff0c;文中通过示例代码介绍的非常详细&#xff0c;具有一定的参考价值&#xff0c;感兴趣的小伙伴们可以参考一下 前言 有的测试用例&#xff0c;需要依赖于某些特定的case才可以执行&#xff0c;比如登录获取…