[email protected] test data / hdfs dfs mkdir ...">

当前位置:网站首页>MapReduce之分区案例的代码实现

MapReduce之分区案例的代码实现

2022-06-10 15:29:00 QYHuiiQ

在此案例中,我们使用学生成绩实现分区功能(第三列为成绩):

  •  将原始数据上传至HDFS
[[email protected] test_data]# hdfs dfs -mkdir /test_partition_input
[[email protected] test_data]# hdfs dfs -put test_partiton.csv /test_partition_input

新建project:

  • 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wyh.test</groupId>
    <artifactId>test_partition</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>


    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
  • 创建自定义Mapper类
package wyh.test.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


/**
 * 四个泛型,分别是K1,V1,K2,V2的类型(要分区的字段必须包含在K2里),这里我们可以暂时将V2置空
 */
public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    /**
     * map(...)用于将K1,V1转为K2,V2,在我们的案例中,K2直接使用V1的值即可。
     * K1为行偏移量,V1为行数据
     * K2为V1的值,也即行数据
     * V2置空
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, NullWritable.get());//NullWritable.get()需要使用get()方法拿到NullWritable对应的对象
    }
}
  • 创建自定义Partitioner
package wyh.test.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 两个泛型依次对应K2,V2的类型
 */
public class PartitionPartitioner extends Partitioner<Text, NullWritable> {

    /**
     * 该方法用于定义分区的具体规则,并返回分区的编号
     * @param text - K2
     * @param nullWritable - V2
     * @param i - 分区个数
     * @return
     */
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        //获取原始行数据,并截取成绩值
        String[] split = text.toString().split(",");
        String gradeString = split[2];
        int grade=Integer.parseInt(gradeString);

        //定义分区规则
        if(grade > 90){
            return 1;//返回分区编号
        }else{
            return 0;
        }



    }
}
  • 自定义Reducer类
package wyh.test.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 四个泛型依次为K2,V2,K3,V3的类型
 * K2 - 行数据
 * V2 - 置空
 * K3 - 行数据(在我们的案例中此处的Reduce中不需要对数据进行处理),直接将数据进行传递即可。
 * V3 - 置空
 */
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());

    }
}
  • 创建主类
package wyh.test.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PartitionJobMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        //创建job对象
        Job job = Job.getInstance(super.getConf(), "test_partition_job");
        //集群中必须配置
        job.setJarByClass(PartitionJobMain.class);
        //配置输入项
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_partition_input"));
        //配置Map
        job.setMapperClass(PartitionMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        //配置分区
        job.setPartitionerClass(PartitionPartitioner.class);
        //配置Reduce
        job.setReducerClass(PartitionReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132/test_partition_output"));
        //设置Reduce Task个数(默认是1),Reduce Task个数也即分区个数
        job.setNumReduceTasks(2);
        //等待job执行状态返回值
        boolean status = job.waitForCompletion(true);
        //三目运算的结果会引用到main()方法里的runStatus
        return status?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //启动job,返回任务执行状态
        int runStatus = ToolRunner.run(configuration, new PartitionJobMain(), args);
        System.exit(runStatus);
        

    }
}
  • 打包

先clean,清除编译过程中产生的中间文件,然后package进行打包:

  • 将打好的jar包上传至服务器

  •  执行jar
[[email protected] test_jar]# hadoop jar test_partition-1.0-SNAPSHOT.jar wyh.test.partition.PartitionJobMain

#最后面的值是主类的全路径
  •  查看目录树结构

由于我们设置的是两个分区,所以这里就会生成两个分区文件:

  •  查看分区结果

在PartitionPartitioner类中我们指定了成绩大于90的进入1号分区,否则进入0号分区:

 所以查看分区文件中0号文件得到的就是成绩<=90的所有学生信息:

 查看分区文件中1号文件得到的就是成绩>90的所有学生信息:

 这样就简单地实现了MapReduce中分区的功能。

原网站

版权声明
本文为[QYHuiiQ]所创,转载请带上原文链接,感谢
https://blog.csdn.net/QYHuiiQ/article/details/125035580

随机推荐