当前位置:网站首页>MapReduce之分区案例的代码实现
MapReduce之分区案例的代码实现
2022-06-10 15:29:00 【QYHuiiQ】
在此案例中,我们使用学生成绩实现分区功能(第三列为成绩):

- 将原始数据上传至HDFS
[[email protected] test_data]# hdfs dfs -mkdir /test_partition_input
[[email protected] test_data]# hdfs dfs -put test_partiton.csv /test_partition_input

新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>test_partition</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 创建自定义Mapper类
package wyh.test.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 四个泛型,分别是K1,V1,K2,V2的类型(要分区的字段必须包含在K2里),这里我们可以暂时将V2置空
*/
public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* map(...)用于将K1,V1转为K2,V2,在我们的案例中,K2直接使用V1的值即可。
* K1为行偏移量,V1为行数据
* K2为V1的值,也即行数据
* V2置空
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());//NullWritable.get()需要使用get()方法拿到NullWritable对应的对象
}
}
- 创建自定义Partitioner
package wyh.test.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 两个泛型依次对应K2,V2的类型
*/
public class PartitionPartitioner extends Partitioner<Text, NullWritable> {
/**
* 该方法用于定义分区的具体规则,并返回分区的编号
* @param text - K2
* @param nullWritable - V2
* @param i - 分区个数
* @return
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
//获取原始行数据,并截取成绩值
String[] split = text.toString().split(",");
String gradeString = split[2];
int grade=Integer.parseInt(gradeString);
//定义分区规则
if(grade > 90){
return 1;//返回分区编号
}else{
return 0;
}
}
}
- 自定义Reducer类
package wyh.test.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 四个泛型依次为K2,V2,K3,V3的类型
* K2 - 行数据
* V2 - 置空
* K3 - 行数据(在我们的案例中此处的Reduce中不需要对数据进行处理),直接将数据进行传递即可。
* V3 - 置空
*/
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
- 创建主类
package wyh.test.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PartitionJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//创建job对象
Job job = Job.getInstance(super.getConf(), "test_partition_job");
//集群中必须配置
job.setJarByClass(PartitionJobMain.class);
//配置输入项
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_partition_input"));
//配置Map
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//配置分区
job.setPartitionerClass(PartitionPartitioner.class);
//配置Reduce
job.setReducerClass(PartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132/test_partition_output"));
//设置Reduce Task个数(默认是1),Reduce Task个数也即分区个数
job.setNumReduceTasks(2);
//等待job执行状态返回值
boolean status = job.waitForCompletion(true);
//三目运算的结果会引用到main()方法里的runStatus
return status?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动job,返回任务执行状态
int runStatus = ToolRunner.run(configuration, new PartitionJobMain(), args);
System.exit(runStatus);
}
}
- 打包
先clean,清除编译过程中产生的中间文件,然后package进行打包:

- 将打好的jar包上传至服务器

- 执行jar
[[email protected] test_jar]# hadoop jar test_partition-1.0-SNAPSHOT.jar wyh.test.partition.PartitionJobMain
#最后面的值是主类的全路径- 查看目录树结构
由于我们设置的是两个分区,所以这里就会生成两个分区文件:

- 查看分区结果
在PartitionPartitioner类中我们指定了成绩大于90的进入1号分区,否则进入0号分区:

所以查看分区文件中0号文件得到的就是成绩<=90的所有学生信息:

查看分区文件中1号文件得到的就是成绩>90的所有学生信息:

这样就简单地实现了MapReduce中分区的功能。
边栏推荐
- opencv#4 手写体识别:自建训练集完美
- Applet warning: now you can provide attr `wx:key` for a `wx:for` to improve performance
- C# 游戏雏形 人物地图双重移动
- Self recommendation - in depth understanding of the rust Standard Library Kernel
- 反“内卷”,消息称 360 企业安全云将上线“一键强制下班”功能,电脑自动关闭办公软件
- Guanghetong high computing power intelligent module injects intelligence into 5g c-v2x in the trillion market
- How to write a global notice component?
- SQL语言
- 【高代码文件格式API】上海道宁为您提供文件格式API集——Aspose,只需几行代码即可创建转换和操作100多种文件格式
- 点投影到平面上的方法总结
猜你喜欢

Anti "internal roll", it is said that 360 enterprise security cloud will launch the "one click forced off duty" function, and the computer will automatically close the office software

Using GDB to quickly read the kernel code of PostgreSQL

姿态估计之2D人体姿态估计 - (OpenPose) Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields

Explain the opencv function filter2d() in detail and remind you that the operation it does is not convolution but correlation operation

姿态估计之2D人体姿态估计 - Associative Embedding: End-to-End Learning for Joint Detection and Grouping

Interpretation of cube technology | past and present life of cube Rendering Design

SQL语言

音视频处理三剑客之 AEC:回声产生原因及回声消除原理

探索数据可视化开发平台FlyFish开源背后的秘密!

Information theory and coding 2 final review BCH code
随机推荐
初学pytorch踩坑
使用特定大小、分辨率或背景色保存图窗
Opentelemetry metrics release candidate
How does the wireless communication module help the intelligent UAV build the "Internet of things in the air"?
Cube 技术解读 | Cube 渲染设计的前世今生
反“内卷”,消息称 360 企业安全云将上线“一键强制下班”功能,电脑自动关闭办公软件
农产品期货如何开户?有没有什么资金条件?
Net core Tianma XingKong series - Interface Implementation for dependency injection and mutual conversion of database tables and C entity classes
Guanghetong cooperates with China Mobile, HP, MediaTek and Intel to build 5g fully connected PC pan terminal products
How to build a customer-centric product blueprint: suggestions from the chief technology officer
Methods commonly used in uniapp (part) - timestamp problem and rich text parsing image problem
顺应医改,积极布局——集采背景下的高值医用耗材发展洞察2022
Baidu open source ice-ba installation and operation summary
智能电网终极Buff | 广和通模组贯穿“发、输、变、配、用”全环节
Jiabo gp2120tu label printer installation and use tutorial (PC)
Digital management medium + low code, jnpf opens a new engine for enterprise digital transformation
[sans titre]
苹果式中文:似乎表达清楚意思了,懂了没完全懂的苹果式宣传文案
从“初代播种”到“落地生花”,广和通在5G商用三年间做了什么?
企业如何提升文档管理水平