当前位置:网站首页>MapReduce案例之聚合求和
MapReduce案例之聚合求和
2022-06-10 15:29:00 【QYHuiiQ】
在这个案例中,我们要实现的是统计每个用户(以手机号作为用户标识)过去几个月分别在数码、出行、餐饮方面的支出之和,也就是将每个月的数码支出进行求和,出行支出求和,餐饮方面求和,最后得出3个值。
- 准备数据

将数据上传至服务器:

[[email protected] test_data]# cat test_shopping.txt
15525535555,03,2890,437,1495
16626636666,05,1264,308,1677
16626636666,03,530,259,2174
15525535555,05,1088,372,1726
16626636666,04,754,417,1586
将数据文件上传至HDFS:
[[email protected] test_jar]# hdfs dfs -mkdir /test_shopping_input
[[email protected] test_jar]# cd ../
[[email protected] hadoop-2.7.5]# cd ../test_data/
[[email protected] test_data]# hdfs dfs -put test_shopping.txt /test_shopping_input

新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>mapreduce_shopping</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 创建自定义Bean类封装三项支出
package wyh.test.shopping;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//实现Writable接口从而使ShoppinngSumBean可以实现序列化与反序列化
public class ShoppinngSumBean implements Writable {
private Integer digital;//数码支出
private Integer commute;//出行支出
private Integer catering;//餐饮支出
public Integer getDigital() {
return digital;
}
public void setDigital(Integer digital) {
this.digital = digital;
}
public Integer getcommute() {
return commute;
}
public void setcommute(Integer commute) {
this.commute = commute;
}
public Integer getCatering() {
return catering;
}
public void setCatering(Integer catering) {
this.catering = catering;
}
@Override
public String toString() {
return digital +
"\t" + commute +
"\t" + catering;
}
//实现Writable接口中的序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(digital);
dataOutput.writeInt(commute);
dataOutput.writeInt(catering);
}
//实现Writable接口中的反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.digital = dataInput.readInt();
this.commute = dataInput.readInt();
this.catering = dataInput.readInt();
}
}
- 自定义Mapper
package wyh.test.shopping;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* K1为行偏移量
* V1为行数据
* K2为用户标识(手机号)
* V2为ShoppingSumBean(每条行数据中的数码、出行、餐饮支出值封装而成的对象)
*/
public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingSumBean> {
/**
* K1 V1
* 0 15525535555,03,2890,437,1495
* K2 V2
* 15525535555 ShoppingSumBean(2890,437,1495)
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拆分行数据,截取出手机号(用户标识),即可得到K2
String[] splitString = value.toString().split(",");
String userPhone = splitString[0];
//创建ShoppingSumBean对象,并从行数据中提取出三项支出值,赋值给ShoppingSumBean对象,即可得到V2
ShoppingSumBean shoppingSumBean = new ShoppingSumBean();
shoppingSumBean.setDigital(Integer.parseInt(splitString[2]));
shoppingSumBean.setcommute(Integer.parseInt(splitString[3]));
shoppingSumBean.setCatering(Integer.parseInt(splitString[4]));
//将K2,V2写入context对象中
context.write(new Text(userPhone), shoppingSumBean);
}
}
- 自定义Reducer类
package wyh.test.shopping;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* K2 V2(经过shuffle阶段之后,同一K2的多条数据会放在一个集合中)
* 15525535555 ShoppingSumBean(2890,437,1495),ShoppingSumBean(1088,372,1726),ShoppingSumBean(340,319,1653)
* K3 V3(将统一用户的多条记录的三项支出分别加和,分别得到各项支出的总和,然后依然封装为ShoppingSumBean)
* 15525535555 ShoppingSumBean(4318,1128,4874)
*/
public class ShoppingReducer extends Reducer<Text, ShoppingSumBean, Text, ShoppingSumBean> {
@Override
protected void reduce(Text key, Iterable<ShoppingSumBean> values, Reducer<Text, ShoppingSumBean, Text, ShoppingSumBean>.Context context) throws IOException, InterruptedException {
//遍历V2集合,拿到集合中每个元素中的三项支出值,然后分别对三项支出进行累加
Integer digitalSum = 0;
Integer commuteSum = 0;
Integer cateringSum = 0;
for (ShoppingSumBean shoppingSumBean : values) {
digitalSum += shoppingSumBean.getDigital();
commuteSum += shoppingSumBean.getcommute();
cateringSum += shoppingSumBean.getCatering();
}
//创建ShoppingSumBean对象,将上面计算得到的三项支出加和的值赋给ShoppingSumBean,即可得到V3
ShoppingSumBean shoppingSumBean = new ShoppingSumBean();
shoppingSumBean.setDigital(digitalSum);
shoppingSumBean.setcommute(commuteSum);
shoppingSumBean.setCatering(cateringSum);
//将K3,V3写入context对象
context.write(key, shoppingSumBean);
}
}
- 自定义主类
package wyh.test.shopping;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ShppingJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "test_shopping_job");
job.setJarByClass(ShppingJobMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_shopping_input"));
job.setMapperClass(ShoppingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ShoppingSumBean.class);
job.setReducerClass(ShoppingReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ShoppingSumBean.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_shopping_output"));
boolean status = job.waitForCompletion(true);
return status?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动job任务
int runStatus = ToolRunner.run(configuration, new ShppingJobMain(), args);
System.exit(runStatus);
}
}
- 打包

- 将project jar上传至服务器并运行jar

[[email protected] test_jar]# hadoop jar mapreduce_shopping-1.0-SNAPSHOT.jar wyh.test.shopping.ShppingJobMain
- 查看运行结果

[[email protected] test_jar]# hdfs dfs -cat /test_shopping_output/part-r-00000
15525535555 4318 1128 4874
16626636666 2548 984 5437
可以看到运算结果与我们预期的结果是一致的。
这样就是简单地实现了MapReduce中聚合求和的案例。
边栏推荐
- Summary of methods for point projection onto a plane
- 顺应医改,积极布局——集采背景下的高值医用耗材发展洞察2022
- 2290. Minimum Obstacle Removal to Reach Corner
- ORB_SLAM2视觉惯性紧耦合定位技术路线与代码详解1——IMU流型预积分
- 【高代码文件格式API】上海道宁为您提供文件格式API集——Aspose,只需几行代码即可创建转换和操作100多种文件格式
- VINS理论与代码详解0——理论基础白话篇
- terminator如何设置字体显示不同颜色
- Click to unlock "keyword" of guanghetong 5g module
- Hessian matrix of convex function and Gauss Newton descent method
- Information theory and coding 2 final review BCH code
猜你喜欢

Vins theory and code explanation 0 -- theoretical basis in vernacular

TensorFlow实战Google深度学习框架第二版学习总结-TensorFlow入门

Digital management medium + low code, jnpf opens a new engine for enterprise digital transformation

Using GDB to quickly read the kernel code of PostgreSQL

VINS理论与代码详解4——初始化

How does the wireless communication module help the intelligent UAV build the "Internet of things in the air"?

探索数据可视化开发平台FlyFish开源背后的秘密!

作用域和闭包

uniapp中常用到的方法(部分) - 时间戳问题及富文本解析图片问题

初学pytorch踩坑
随机推荐
视觉SLAM常见的QR分解SVD分解等矩阵分解方式求解满秩和亏秩最小二乘问题(最全的方法分析总结)
Necessary tools for automatic operation and maintenance shell script introduction
影刀RPA学习和遇见excel部分问题解决方式
How to open an account for agricultural futures? Are there any financial conditions?
One-way hash function
排名前十、手续费低的期货公司有哪些?安全吗
Jiabo gp2120tu label printer installation and use tutorial (PC)
竟然還有人說ArrayList是2倍擴容,今天帶你手撕ArrayList源碼
2290. Minimum Obstacle Removal to Reach Corner
广和通携手中国移动、惠普、联发科、英特尔合作打造5G全互联PC泛终端系列产品
【高代码文件格式API】上海道宁为您提供文件格式API集——Aspose,只需几行代码即可创建转换和操作100多种文件格式
Hutool Usage Summary (VIP collection version)
[rust daily] 2022-04-19 performance evaluation of rust asynchronous framework
Problems with database creation triggers
【无标题】
docket命令
姿态估计之2D人体姿态估计 - SimDR: Is 2D Heatmap Representation Even Necessary for Human Pose Estimation?
uniapp中常用到的方法(部分) - 时间戳问题及富文本解析图片问题
使用特定大小、分辨率或背景色保存图窗
What has guanghetong done in the three years of 5g business from "seeding in the first generation" to "flower on the ground"?