当前位置:网站首页>MapReduce之排序及序列化案例的代码实现
MapReduce之排序及序列化案例的代码实现
2022-06-10 15:29:00 【QYHuiiQ】
这里我们要实现的案例是先对学生姓名进行排序(字典排序),如果遇到姓名重名的情况,再对年龄进行排序(升序)。
- 将原始数据文件上传至HDFS
[[email protected] test_data]# hdfs dfs -mkdir /test_comparation_input
[[email protected] test_data]# hdfs dfs -put test_comparation.txt /test_comparation_input
这里会对原始的5行数据先按照姓名的ascii值进行字典排序,对于重名的Bob,会对第2行和第4行的两个Bob按照年龄进行二次排序。

新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>test_comparation</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 自定义实现排序及序列化的Bean类
package wyh.test.comparation;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ComparationBean implements WritableComparable<ComparationBean> {
//定义Bean对象中要包含的属性
private String studentName;
private int age;
public String getStudentName() {
return studentName;
}
public void setStudentName(String studentName) {
this.studentName = studentName;
}
@Override
public String toString() {
//这里没有使用它原来的格式,我们自己重新定义了toString的格式
return studentName + "\t" + age;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
//定义排序规则,我们只需要定义比较规则即可,不需要我们关心如何调用
@Override
public int compareTo(ComparationBean comparationBean) {
//先用String类中自带的compareTo()实现字符串studentName的排序
int compareResult = this.studentName.compareTo(comparationBean.getStudentName());
//当studentName比较结果是0时,说明重名,此时再比较age的值
if(compareResult == 0){
return this.age - comparationBean.getAge();
}
return compareResult;
}
//该方法用于实现序列化,即将原始数据转换为字节流
@Override
public void write(DataOutput dataOutput) throws IOException {
//将Bean对象中的第一个属性实现序列化,对于字符串的序列化,使用的是writeUTF()
dataOutput.writeUTF(studentName);
//将Bean对象中的第二个属性实现序列化,对于int类型的序列化,使用的是writeInt()
dataOutput.writeInt(age);
}
//该方法用于实现反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
//将反序列化的值赋给成员变量
this.studentName = dataInput.readUTF();
this.age = dataInput.readInt();
}
}
- 自定义Mapper类
package wyh.test.comparation;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//这里的K2类型是我们自定义的Bean(因为要实现排序及序列化),这个Bean对象中的属性来自于V1.
public class ComparationMapper extends Mapper<LongWritable, Text, ComparationBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ComparationBean, NullWritable>.Context context) throws IOException, InterruptedException {
//将V1的行数据拆分提取为两个属性,赋给自定义Bean对象
String[] split = value.toString().split(",");
ComparationBean comparationBean = new ComparationBean();
comparationBean.setStudentName(split[0]);
comparationBean.setAge(Integer.parseInt(split[1]));
//将K2,V2写入context对象
context.write(comparationBean, NullWritable.get());
}
}
- 自定义Reducer类
package wyh.test.comparation;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//在我们的案例中reduce不需要对K2,V2做进一步的处理,所以这里直接将K2,V2赋值给K3,V3即可
public class ComparationReducer extends Reducer<ComparationBean, NullWritable, ComparationBean, NullWritable> {
@Override
protected void reduce(ComparationBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
- 自定义JobMain类
package wyh.test.comparation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ComparationJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "test_comparation_job");
job.setJarByClass(ComparationJobMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_comparation_input"));
job.setMapperClass(ComparationMapper.class);
job.setMapOutputKeyClass(ComparationBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(ComparationReducer.class);
job.setOutputKeyClass(ComparationBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_comparation_output"));
boolean status = job.waitForCompletion(true);
return status?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动job任务
int runStatus = ToolRunner.run(configuration, new ComparationJobMain(), args);
System.exit(runStatus);
}
}
- 打包project
clean---package

- 将jar上传至服务器

- 运行jar
[[email protected] test_jar]# hadoop jar test_comparation-1.0-SNAPSHOT.jar wyh.test.comparation.ComparationJobMain
- 查看HDFS目录树

- 查看输出结果文件
[[email protected] test_jar]# hdfs dfs -cat /test_comparation_output/part-r-00000
可以看到姓名先做了排序,然后重名的Bob会按照年龄的升序进行二次排序:

这样就简单地实现了MapReduce中的排序及序列化功能。
边栏推荐
- 姿态估计之2D人体姿态估计 - Simple Baseline(SBL)
- 面试题详情
- 港大、英伟达 | Factuality Enhanced Language Models for Open-Ended Text Generation(用于开放式文本生成的事实性增强语言模型)
- Wechat applet slides to the top
- Tensorflow actual combat Google deep learning framework second edition learning summary tensorflow introduction
- Interview question details
- 点击解锁广和通5G模组“关键词”
- Net core Tianma XingKong series - Interface Implementation for dependency injection and mutual conversion of database tables and C entity classes
- In what scenario can we not use the arrow function?
- 反“内卷”,消息称 360 企业安全云将上线“一键强制下班”功能,电脑自动关闭办公软件
猜你喜欢

Methods commonly used in uniapp (part) - timestamp problem and rich text parsing image problem

Problems with database creation triggers

SVM and ANN of OpenCV neural network library_ Use of MLP

Technology sharing | quick intercom, global intercom

Information theory and coding 2 final review BCH code

CentOS Linux is dead! Oracle Linux may be a better alternative

ORB_ Slam2 visual inertial tight coupling positioning technology route and code explanation 0 - overall framework and theoretical basic knowledge

Recommend an easy-to-use designer navigation website

使用特定大小、分辨率或背景色保存图窗

What has guanghetong done in the three years of 5g business from "seeding in the first generation" to "flower on the ground"?
随机推荐
Huawei cloud SRE deterministic O & M introduction
rk3399_ 9.0 first level menu Network & Internet without setting
Applet warning: now you can provide attr `wx:key` for a `wx:for` to improve performance
CentOS Linux is dead! Oracle Linux may be a better alternative
Digital management medium + low code, jnpf opens a new engine for enterprise digital transformation
Cap version 6.1 Release Notice
Overview of cann interface calling process
港大、英伟达 | Factuality Enhanced Language Models for Open-Ended Text Generation(用于开放式文本生成的事实性增强语言模型)
初学pytorch踩坑
点击解锁广和通5G模组“关键词”
产品设计软件Figma用不了,国内有哪些相似功能的软件
Vins theory and code explanation 4 - initialization
VINS理論與代碼詳解4——初始化
After class assignment for module 8 of phase 6 of the construction practice camp
Li Kou daily question - day 18 -350 Intersection of two data Ⅱ
What are the top ten futures companies with low handling fees? Is it safe?
Methods commonly used in uniapp (part) - timestamp problem and rich text parsing image problem
opencv神经网络库之SVM和ANN_MLP的使用
广和通高算力智能模组为万亿级市场5G C-V2X注智
How does CRM help enterprises and salespeople?