MapReduce 核心思想:分治
Map
:负责分,即把复杂的任务分解为若干个简单的任务来并行处理。拆分前提为这些小任务可以进行并行计算,彼此之间几乎没有依赖关系。
Reduce
: 负责合,对map阶段的结果进行全局汇总,最后进行数据输出。
Map和Reduce这两个阶段和一起就是MapReduce的思想体现。
MapReduce编程的八个步骤
Map 阶段: 分的步骤
第一步:读取文件,将其解析为key/value
对 k1/v1
,并将 k1/v1
输出到第二步
第二步:自定义map逻辑,处理第一步发过来的 k1/v1
,自定义处理逻辑进行处理,处理后变为k2/v2
,输出到第三步
Shuffle阶段: 分组阶段
第三步:分区,对k2/v2
进行分区,相同的key将会发送到同一个Reduce中,形成一个集合
第五步:规约,主要在map端对数据做一次聚合,减少我们输出key2的数据量
第六步:分组,将相同的数据发送到同一组里面去调用一次reduce逻辑
**Reduce阶段:**合的步骤
第七步:自定义reduce逻辑,接收k2/v2
,将其转换为k3/v3
进行输出
第八步:输出,将reduce处理完成的数据进行输出
每个步骤都对应一个Class类,将八个类组织到一起就是MapReduce程序
单词统计MapReduce编程示例
代码可以参考:https://github.com/xf616510229/hadoop-study/tree/master/wordcount
0. 添加测试数据 wordcount.txt到hdfs
# 创建测试数据
vim wordcount.txt ->
hello wold spark
hadoop,bigdata
java,hadoop
bigdata,spark
spark
# 创建hdfs文件夹
hdfs dfs -mkdir /wordcount
# 上传到hdfs
hdfs dfs -put wordcount.txt /wordcount
1. 创建JobMain对象,此对象代表一个MapReduce任务,负责将八大步骤(八个类)编织为一个流程:
/**
* Job 代表一个MapReduce任务,负责编织八大步骤
*
* 注意:本地运行可能会有权限问题,可有三种解决方案:
* 1. 修改hdfs-site.xml的dfs.permissions字段,关闭权限
* 2. 修改目标输出文件的父文件夹的权限
* 3. 打包为jar,放到服务器上运行: hadoop jar wordcount.jar com.yangsx95.hadoop.wordcount.JobMain
*
* @author yangsx
* @version 1.0
* @date 2019/6/2
*/
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
// 获取job对象,job对象负责将八个步骤编织到一起,并交给yarn集群运行
Job job = Job.getInstance(super.getConf(), "wordcountJob");
// 打包运行需要加上此句代码,否则会ClassNotFound
job.setJarByClass(JobMain.class);
// 开始组织八个步骤,即八个类
// 1. 读取文件,解析为k1/v1,注意 TextInputFormat reduce包下的是2.x mapred是1.x
job.setInputFormatClass(TextInputFormat.class);
// 集群运行模式,需要从hdfs中获取文件
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));
// 从本地获取文件
//TextInputFormat.addInputPath(job, new Path("file:///D:\\data\\wordcount"));
// 2. 自定义map逻辑,接收第一步的k1/v1,转换为 k2/v2
job.setMapperClass(WordCountMapper.class);
// 设置k2,v2的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 3 - 6 省略,3:分区,将相同的key的value发送到一个reduce中,形成一个集合
// 7. 设置reduce类,接收k2/v2 输出 k3/v3, 并设置k3/v3的类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 8. 设置输出类,outputFormat,需要保证目标输出路径不存在,结果将会放到此路径中
job.setOutputFormatClass(TextOutputFormat.class);
// 输出到hdfs上
TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/wordcountout"));
// 输出到本地
//TextOutputFormat.setOutputPath(job, new Path("file:///D:\\data\\wordcountout"));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 提交job,任务完成后,会返回一个状态码值,如果为0,表示程序运行成功
int code = ToolRunner.run(configuration, new JobMain(), args);
System.exit(code);
}
}
2. 创建WordCountMapper
对象,主要负责map工作,将k1/v1转换为k2/v2:
/**
* 步骤2 用于将k1/v1转换为 k2/v2
* <p>
* org.apache.hadoop.mapred.Mapper 1.x
* org.apache.hadoop.mapreduce.Mapper: 2.x
* <p>
* 继承Mapper代表此类是一个Mapper类,需要四个泛型: keyin valuein keyout valueout, 即k1 v1 k2 v2
* 所以,可以指定泛型 Mapper<Long, String, String, Integer>
* Hadoop自己封装了一套数据类型,好处是序列化比较快,全部实现了接口 WritableComparable ,此接口又实现了 Writable与Comparable接口
*
* @author yangsx
* @version 1.0
* @date 2019/6/2
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 变量提升,防止多行调用map,造成不必要的创建操作
*/
private Text text = new Text();
private IntWritable intWritable = new IntWritable();
/**
* 每行数据都会调用此方法一次,此方法定义了Mapper的处理逻辑
* 此时是第二步,第一步已经经过TextInputFormat,数据已经变为 key:行偏移量 value:行内容了
* 此时需要将数据变为 key: 单词内容 value: 单词数量
* @param context 上下文对象,承接上文数据,传输数据至下文
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineContent = value.toString();
String[] words = lineContent.split(",");
// 遍历单词
for (String word : words) {
text.set(word);
intWritable.set(1);
// 存放k2/v2
context.write(text, intWritable);
}
}
}
3. 创建WordCountReducer
对象,主要负责reduce合的操作,将k2/v2转换为k3/v3:
/**
* 步骤7 自定义reduce逻辑,针对分布式计算的结果进行整合
*
* 继承reducer类表明此类用于reduce计算,同样有提供hadoop1.x与2.x的版本
* 四个泛型,分别代表 k2 v2 k3 v3
* @author yangsx
* @version 1.0
* @date 2019/6/2
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 定义具体的reduce逻辑
* @param values 步骤3生成的集合
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 遍历集合,计算次数
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
// 将数据写出
context.write(key, new IntWritable(count));
}
}
MapReduce的分区
在MapReduce八大编程步骤中,第三步为分区,对k2/v2
进行分区,相同的key将会发送到同一个Reduce中,形成一个集合。
分区主要将相同的数据发送到同一个reduce中。