当前位置:网站首页>MapReduce之分区案例的代码实现
MapReduce之分区案例的代码实现
2022-06-10 15:29:00 【QYHuiiQ】
在此案例中,我们使用学生成绩实现分区功能(第三列为成绩):

- 将原始数据上传至HDFS
[[email protected] test_data]# hdfs dfs -mkdir /test_partition_input
[[email protected] test_data]# hdfs dfs -put test_partiton.csv /test_partition_input

新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>test_partition</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 创建自定义Mapper类
package wyh.test.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 四个泛型,分别是K1,V1,K2,V2的类型(要分区的字段必须包含在K2里),这里我们可以暂时将V2置空
*/
public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* map(...)用于将K1,V1转为K2,V2,在我们的案例中,K2直接使用V1的值即可。
* K1为行偏移量,V1为行数据
* K2为V1的值,也即行数据
* V2置空
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());//NullWritable.get()需要使用get()方法拿到NullWritable对应的对象
}
}
- 创建自定义Partitioner
package wyh.test.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 两个泛型依次对应K2,V2的类型
*/
public class PartitionPartitioner extends Partitioner<Text, NullWritable> {
/**
* 该方法用于定义分区的具体规则,并返回分区的编号
* @param text - K2
* @param nullWritable - V2
* @param i - 分区个数
* @return
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
//获取原始行数据,并截取成绩值
String[] split = text.toString().split(",");
String gradeString = split[2];
int grade=Integer.parseInt(gradeString);
//定义分区规则
if(grade > 90){
return 1;//返回分区编号
}else{
return 0;
}
}
}
- 自定义Reducer类
package wyh.test.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 四个泛型依次为K2,V2,K3,V3的类型
* K2 - 行数据
* V2 - 置空
* K3 - 行数据(在我们的案例中此处的Reduce中不需要对数据进行处理),直接将数据进行传递即可。
* V3 - 置空
*/
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
- 创建主类
package wyh.test.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PartitionJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//创建job对象
Job job = Job.getInstance(super.getConf(), "test_partition_job");
//集群中必须配置
job.setJarByClass(PartitionJobMain.class);
//配置输入项
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_partition_input"));
//配置Map
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//配置分区
job.setPartitionerClass(PartitionPartitioner.class);
//配置Reduce
job.setReducerClass(PartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132/test_partition_output"));
//设置Reduce Task个数(默认是1),Reduce Task个数也即分区个数
job.setNumReduceTasks(2);
//等待job执行状态返回值
boolean status = job.waitForCompletion(true);
//三目运算的结果会引用到main()方法里的runStatus
return status?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动job,返回任务执行状态
int runStatus = ToolRunner.run(configuration, new PartitionJobMain(), args);
System.exit(runStatus);
}
}
- 打包
先clean,清除编译过程中产生的中间文件,然后package进行打包:

- 将打好的jar包上传至服务器

- 执行jar
[[email protected] test_jar]# hadoop jar test_partition-1.0-SNAPSHOT.jar wyh.test.partition.PartitionJobMain
#最后面的值是主类的全路径- 查看目录树结构
由于我们设置的是两个分区,所以这里就会生成两个分区文件:

- 查看分区结果
在PartitionPartitioner类中我们指定了成绩大于90的进入1号分区,否则进入0号分区:

所以查看分区文件中0号文件得到的就是成绩<=90的所有学生信息:

查看分区文件中1号文件得到的就是成绩>90的所有学生信息:

这样就简单地实现了MapReduce中分区的功能。
边栏推荐
- After class assignment for module 8 of phase 6 of the construction practice camp
- 凸函数的Hessian矩阵与高斯牛顿下降法增量矩阵半正定性的理解
- opencv#4 手写体识别:自建训练集完美
- 影刀RPA学习和遇见excel部分问题解决方式
- 排序与分页
- SQL语言
- QT 基于QScrollArea的界面嵌套移动
- [MySQL basics]
- 姿态估计之2D人体姿态估计 - SimDR: Is 2D Heatmap Representation Even Necessary for Human Pose Estimation?
- 百度开源ICE-BA安装运行总结
猜你喜欢

Explain the opencv function filter2d() in detail and remind you that the operation it does is not convolution but correlation operation

Fast detection of short text repetition rate

探索数据可视化开发平台FlyFish开源背后的秘密!

初学pytorch踩坑
![[MySQL basics]](/img/fc/48e9c6b739e29472a7a103e47663af.png)
[MySQL basics]

Scope and closure

姿态估计之2D人体姿态估计 - Simple Baseline(SBL)

数字化管理中台+低代码,JNPF开启企业数字化转型的新引擎
![Google X开源抓取机械臂,无需人工标注就能一眼找到目标零件[转]](/img/69/02a3e0eeaf3049f41b118cf0c58972.jpg)
Google X开源抓取机械臂,无需人工标注就能一眼找到目标零件[转]

姿态估计之2D人体姿态估计 - (OpenPose) Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields
随机推荐
ORB_ Slam2 visual inertial tight coupling positioning technology route and code explanation 0 - overall framework and theoretical basic knowledge
Vins theory and code explanation 0 -- theoretical basis in vernacular
MySQL8安装详细步骤
农产品期货如何开户?有没有什么资金条件?
音视频处理三剑客之 AEC:回声产生原因及回声消除原理
“绽放杯”5G应用奖项大满贯!广和通多个联合项目荣获通用产品专题赛一、二、三等奖
推荐一个好用的设计师导航网址
智能电网终极Buff | 广和通模组贯穿“发、输、变、配、用”全环节
【无标题】
Detailed installation steps of mysql8
The power of insight
How the terminator sets the font to display different colors
Click to unlock "keyword" of guanghetong 5g module
How the autorunner automated test tool creates a project -alltesting | Zezhong cloud test
[rust daily] 2022-04-19 performance evaluation of rust asynchronous framework
这几个垂直类小众导航网站,你绝对不会想错过
opencv神经网络库之SVM和ANN_MLP的使用
docket命令
姿态估计之2D人体姿态估计 - SimDR: Is 2D Heatmap Representation Even Necessary for Human Pose Estimation?
Google X开源抓取机械臂,无需人工标注就能一眼找到目标零件[转]