MapReduce

MapReduce 核心思想:分治

  • Map:负责分,即把复杂的任务分解为若干个简单的任务来并行处理。拆分前提为这些小任务可以进行并行计算,彼此之间几乎没有依赖关系。

  • Reduce: 负责合,对map阶段的结果进行全局汇总,最后进行数据输出。

Map和Reduce这两个阶段和一起就是MapReduce的思想体现。

MapReduce编程的八个步骤

Map 阶段: 分的步骤

  • 第一步:读取文件,将其解析为key/valuek1/v1,并将 k1/v1输出到第二步

  • 第二步:自定义map逻辑,处理第一步发过来的 k1/v1,自定义处理逻辑进行处理,处理后变为k2/v2,输出到第三步

Shuffle阶段: 分组阶段

  • 第三步:分区,对k2/v2进行分区,相同的key将会发送到同一个Reduce中,形成一个集合

  • 第四步:排序,对上述集合进行字典排序

  • 第五步:规约,主要在map端对数据做一次聚合,减少我们输出key2的数据量

  • 第六步:分组,将相同的数据发送到同一组里面去调用一次reduce逻辑

**Reduce阶段:**合的步骤

  • 第七步:自定义reduce逻辑,接收k2/v2,将其转换为k3/v3进行输出

  • 第八步:输出,将reduce处理完成的数据进行输出

每个步骤都对应一个Class类,将八个类组织到一起就是MapReduce程序

单词统计MapReduce编程示例

代码可以参考:https://github.com/xf616510229/hadoop-study/tree/master/wordcount

0. 添加测试数据 wordcount.txt到hdfs

# 创建测试数据
vim  wordcount.txt ->
hello wold spark
hadoop,bigdata
java,hadoop
bigdata,spark
spark

# 创建hdfs文件夹
hdfs dfs -mkdir /wordcount
# 上传到hdfs
hdfs dfs -put wordcount.txt /wordcount 

1. 创建JobMain对象,此对象代表一个MapReduce任务,负责将八大步骤(八个类)编织为一个流程

/**
 * Job 代表一个MapReduce任务,负责编织八大步骤
 * 
 * 注意:本地运行可能会有权限问题,可有三种解决方案: 
 * 1. 修改hdfs-site.xml的dfs.permissions字段,关闭权限
 * 2. 修改目标输出文件的父文件夹的权限
 * 3. 打包为jar,放到服务器上运行: hadoop jar wordcount.jar com.yangsx95.hadoop.wordcount.JobMain
 *
 * @author yangsx
 * @version 1.0
 * @date 2019/6/2
 */
public class JobMain extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        // 获取job对象,job对象负责将八个步骤编织到一起,并交给yarn集群运行
        Job job = Job.getInstance(super.getConf(), "wordcountJob");
        // 打包运行需要加上此句代码,否则会ClassNotFound
        job.setJarByClass(JobMain.class);
        
        // 开始组织八个步骤,即八个类

        // 1. 读取文件,解析为k1/v1,注意 TextInputFormat reduce包下的是2.x mapred是1.x
        job.setInputFormatClass(TextInputFormat.class);
        // 集群运行模式,需要从hdfs中获取文件
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));
        
        // 从本地获取文件
        //TextInputFormat.addInputPath(job, new Path("file:///D:\\data\\wordcount"));
        
        // 2. 自定义map逻辑,接收第一步的k1/v1,转换为 k2/v2
        job.setMapperClass(WordCountMapper.class);
        // 设置k2,v2的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 3 - 6 省略,3:分区,将相同的key的value发送到一个reduce中,形成一个集合

        // 7. 设置reduce类,接收k2/v2 输出 k3/v3, 并设置k3/v3的类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 8. 设置输出类,outputFormat,需要保证目标输出路径不存在,结果将会放到此路径中
        job.setOutputFormatClass(TextOutputFormat.class);
        // 输出到hdfs上
        TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/wordcountout"));
        // 输出到本地
        //TextOutputFormat.setOutputPath(job, new Path("file:///D:\\data\\wordcountout"));
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // 提交job,任务完成后,会返回一个状态码值,如果为0,表示程序运行成功
        int code = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(code);
    }
}

2. 创建WordCountMapper对象,主要负责map工作,将k1/v1转换为k2/v2:

/**
 * 步骤2 用于将k1/v1转换为 k2/v2
 * <p>
 * org.apache.hadoop.mapred.Mapper 1.x
 * org.apache.hadoop.mapreduce.Mapper: 2.x
 * <p>
 * 继承Mapper代表此类是一个Mapper类,需要四个泛型: keyin valuein keyout valueout, 即k1 v1 k2 v2
 * 所以,可以指定泛型  Mapper<Long, String, String, Integer>
 * Hadoop自己封装了一套数据类型,好处是序列化比较快,全部实现了接口 WritableComparable ,此接口又实现了 Writable与Comparable接口
 *
 * @author yangsx
 * @version 1.0
 * @date 2019/6/2
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * 变量提升,防止多行调用map,造成不必要的创建操作
     */
    private Text text = new Text();
    private IntWritable intWritable = new IntWritable();

    /**
     * 每行数据都会调用此方法一次,此方法定义了Mapper的处理逻辑
     * 此时是第二步,第一步已经经过TextInputFormat,数据已经变为 key:行偏移量 value:行内容了
     * 此时需要将数据变为 key: 单词内容  value: 单词数量
     * @param context 上下文对象,承接上文数据,传输数据至下文
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String lineContent = value.toString();
        String[] words = lineContent.split(",");
        // 遍历单词
        for (String word : words) {
            text.set(word);
            intWritable.set(1);
            // 存放k2/v2
            context.write(text, intWritable);
        }
    }
}

3. 创建WordCountReducer对象,主要负责reduce合的操作,将k2/v2转换为k3/v3:

/**
 * 步骤7 自定义reduce逻辑,针对分布式计算的结果进行整合
 * 
 * 继承reducer类表明此类用于reduce计算,同样有提供hadoop1.x与2.x的版本
 * 四个泛型,分别代表 k2 v2 k3 v3
 * @author yangsx
 * @version 1.0
 * @date 2019/6/2
 */
public class WordCountReducer  extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * 定义具体的reduce逻辑
     * @param values 步骤3生成的集合
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        // 遍历集合,计算次数
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        // 将数据写出
        context.write(key, new IntWritable(count));
    }
}

MapReduce的分区

在MapReduce八大编程步骤中,第三步为分区,对k2/v2进行分区,相同的key将会发送到同一个Reduce中,形成一个集合。

分区主要将相同的数据发送到同一个reduce中。

最后更新于