[email protected] test data / pwd /usr/local/wyh/test data [email protected] test data / vi test word count.txt [email protected] test data...">

当前位置:网站首页>MapReduce之Word Count案例代码实现

MapReduce之Word Count案例代码实现

2022-06-10 15:29:00 QYHuiiQ

  • 准备用于计算word count的数据文件
[[email protected] test_data]# pwd
/usr/local/wyh/test_data
[[email protected] test_data]# vi test_word_count.txt
[[email protected] test_data]# cat test_word_count.txt
hello,world
hadoop,world,sqoop
hive,hello,hadoop,mysql
welcome,hadoop
  • 在HDFS上创建目录,并将上述准备的数据文件上传至HDFS
[[email protected] test_data]# hdfs dfs -mkdir /test_word_count
[[email protected] test_data]# hdfs dfs -put test_word_count.txt /test_word_count/
  • 新建project,pom.xml引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wyh.test.com</groupId>
    <artifactId>test_word_count</artifactId>
    <version>1.0-SNAPSHOT</version>


    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • 创建自定义mapper类继承Mapper类,并重写map()方法,实现k1,v1到k2,v2的转换
package wyh.test.mapreduce;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//泛型中的4个类型依次对应的是K1,V1,K2,V2的类型,这些数据类型是hadoop自己的数据类型,而非java中的数据类型。注意导入的包是hadoop.io下面的包。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    //重写map方法,该方法就是用来将K1.V1转为K2,V2。参数key就是k1,也即偏移量,参数value就是v1,也即该行对应的文本,参数context表示上下文对象
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //将每一行的文本数据进行拆分
        String[] splitWord=value.toString().split(",");
        //遍历数组,转换成为k2,v2
        for (String word:splitWord) {
            //将k2,v2写入上下文对象context中,参数一为要输出的k2,参数二为要输出的v2,拆分后的每个单词的v2都是固定值1
            context.write(new Text(word), new LongWritable(1));
        }


    }
}
  • 创建自定义reduce类继承Reduce类,并重写reduce()方法
package wyh.test.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//四个泛型依次对应经过shuffle阶段之后的k2,v2和要输出的k3,v3
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    //重写reduce方法,将shuffle后的k2,v2转为k3,v3,并将k3,v3写入context中
    //eg:shuffle后的k2,v2:hell     <1,1,1>,也即v2是一个集合类型
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //遍历v2集合,将集合中的值相加得到v3,需要先初始化一个count值
        long count = 0;
        for (LongWritable value : values) {
            count=count+value.get();
        }
        //将k3,v3写入context中,key3的值与key2的值保持一致
        context.write(key, new LongWritable(count));
    }
}
  • 创建自定义主类继承Configured类并实现Tool接口,实现run()方法
package wyh.test.mapreduce;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountMain extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        /**
         *创建job任务对象,参数一为Configuration类型的对象,需要注意的是在同一个job任务中,上下文必须使用同一个Configuration对象,
         * 而下面的main()中已经创建了Configuration对象,所以必须要使用main()中的configuration,而这个对象在下面的run方法中
         * 其实已经被保存在了Configured类中,因为Configured类中有一个私有变量是Configuration对象。所以这里我们就是要想办法拿到Configured类中的configuration,
         * 而当前我们的自定义类WordCountMain又是Configured的子类,所以我们可以通过super对象来调用其父类Configured的configuration对象。
         * 参数二为自定义的job name。
         */
        Job job = Job.getInstance(super.getConf(), "testwordcountJob");
        //!!!!!!!!!!    集群必须要设置    !!!!!!!!
        job.setJarByClass(WordCountMain.class);
        //配置job具体要执行的任务步骤
        //指定读取输入文件的类
        job.setInputFormatClass(TextInputFormat.class);
        //指定要读取的文件的路径,这里写了目录,就会将该目录下的所有文件都读取到
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_word_count"));
        //指定map处理逻辑类
        job.setMapperClass(WordCountMapper.class);
        //指定map阶段输出的k2类型
        job.setMapOutputKeyClass(Text.class);
        //指定map阶段输出的v2类型
        job.setMapOutputValueClass(LongWritable.class);


        //指定reduce处理逻辑类
        job.setReducerClass(WordCountReducer.class);
        //设置reduce之后输出的k3类型
        job.setOutputKeyClass(Text.class);
        //设置reduce之后输出的v3类型
        job.setOutputValueClass(LongWritable.class);

        //设置整个job执行结果的输出处理类
        job.setOutputFormatClass(TextOutputFormat.class);
        //指定结果输出路径,该目录必须是不存在的目录(如已存在该目录,则会报错),它会自动帮我们创建
        TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_word_count_output"));

        //返回执行状态
        boolean status = job.waitForCompletion(true);

        //使用三目运算,将布尔类型的返回值转换为整型返回值,其实这个地方的整型返回值就是返回给了下面main()中的runStatus
        return status ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        /**
         * 参数一是一个Configuration对象,参数二是Tool的实现类对象,参数三是一个String类型的数组参数,可以直接使用main()中的参数args.
         * 返回值是一个整型的值,这个值代表了当前这个任务执行的状态.
         * 调用ToolRunner的run方法启动job任务.
         */
        int runStatus = ToolRunner.run(configuration, new WordCountMain(), args);
        /**
         * 任务执行完成后退出,根据上面状态值进行退出,如果任务执行是成功的,那么就是成功退出,如果任务是失败的,就是失败退出
         */
        System.exit(runStatus);

    }
}
  • 打包java程序

双击package:

打包成功:

 打出来的有两个jar,original开头的是轻量级的(pom.xml中配置了打包插件,所以会产生这个jar),因为它没有把我们的dependency依赖包放进去,另一个是将依赖包也打包进去了,所以就会大一些。用哪个都可以。

 将要上传至服务器的包先拷贝到桌面,然后上传至服务器:

  •  运行jar
[[email protected] word_count_jar]# hadoop jar test_word_count-1.0-SNAPSHOT.jar wyh.test.mapreduce.WordCountMain
#参数中要指定要运行的jar名称以及Main类的全路径,告诉它main()方法所在的类
  • 查看运行的输出结果
[[email protected] word_count_jar]# hdfs dfs -cat /test_word_count_output/*

 这样就简单地实现了word count的功能。

原网站

版权声明
本文为[QYHuiiQ]所创,转载请带上原文链接,感谢
https://blog.csdn.net/QYHuiiQ/article/details/124973888