当前位置:网站首页>Aggregate sum of MapReduce cases
Aggregate sum of MapReduce cases
2022-06-10 16:00:00 【QYHuiiQ】
In this case , What we want to achieve is to count each user ( Take the mobile phone number as the user id ) In the past few months, we have been working on digital 、 Travel 、 The sum of food and beverage expenses , That is to sum the monthly digital expenditure , Sum of travel expenses , Food and beverage , At last, it comes to 3 It's worth .
- Prepare the data

Upload the data to the server :

[[email protected] test_data]# cat test_shopping.txt
15525535555,03,2890,437,1495
16626636666,05,1264,308,1677
16626636666,03,530,259,2174
15525535555,05,1088,372,1726
16626636666,04,754,417,1586
Upload data files to HDFS:
[[email protected] test_jar]# hdfs dfs -mkdir /test_shopping_input
[[email protected] test_jar]# cd ../
[[email protected] hadoop-2.7.5]# cd ../test_data/
[[email protected] test_data]# hdfs dfs -put test_shopping.txt /test_shopping_input

newly build project:
- introduce pom rely on
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>mapreduce_shopping</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- Create custom Bean Class encapsulation three expenses
package wyh.test.shopping;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// Realization Writable Interface so that ShoppinngSumBean Serialization and deserialization can be realized
public class ShoppinngSumBean implements Writable {
private Integer digital;// Digital spending
private Integer commute;// Travel expenses
private Integer catering;// Catering expenses
public Integer getDigital() {
return digital;
}
public void setDigital(Integer digital) {
this.digital = digital;
}
public Integer getcommute() {
return commute;
}
public void setcommute(Integer commute) {
this.commute = commute;
}
public Integer getCatering() {
return catering;
}
public void setCatering(Integer catering) {
this.catering = catering;
}
@Override
public String toString() {
return digital +
"\t" + commute +
"\t" + catering;
}
// Realization Writable Interface
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(digital);
dataOutput.writeInt(commute);
dataOutput.writeInt(catering);
}
// Realization Writable Interface
@Override
public void readFields(DataInput dataInput) throws IOException {
this.digital = dataInput.readInt();
this.commute = dataInput.readInt();
this.catering = dataInput.readInt();
}
}
- Customize Mapper
package wyh.test.shopping;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* K1 Is the row offset
* V1 For row data
* K2 Identify the user ( cell-phone number )
* V2 by ShoppingSumBean( The number in each row of data 、 Travel 、 The object encapsulated by the catering expenditure value )
*/
public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingSumBean> {
/**
* K1 V1
* 0 15525535555,03,2890,437,1495
* K2 V2
* 15525535555 ShoppingSumBean(2890,437,1495)
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Split row data , Cut out the mobile phone number ( User ID ), You can get K2
String[] splitString = value.toString().split(",");
String userPhone = splitString[0];
// establish ShoppingSumBean object , And extract three expenditure values from the row data , Assign a value to ShoppingSumBean object , You can get V2
ShoppingSumBean shoppingSumBean = new ShoppingSumBean();
shoppingSumBean.setDigital(Integer.parseInt(splitString[2]));
shoppingSumBean.setcommute(Integer.parseInt(splitString[3]));
shoppingSumBean.setCatering(Integer.parseInt(splitString[4]));
// take K2,V2 write in context In the object
context.write(new Text(userPhone), shoppingSumBean);
}
}
- Customize Reducer class
package wyh.test.shopping;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* K2 V2( after shuffle After the phase , same K2 Multiple pieces of data will be placed in a set )
* 15525535555 ShoppingSumBean(2890,437,1495),ShoppingSumBean(1088,372,1726),ShoppingSumBean(340,319,1653)
* K3 V3( Add the three expenses of multiple records of a unified user respectively , Get the sum of each expenditure separately , And still encapsulated as ShoppingSumBean)
* 15525535555 ShoppingSumBean(4318,1128,4874)
*/
public class ShoppingReducer extends Reducer<Text, ShoppingSumBean, Text, ShoppingSumBean> {
@Override
protected void reduce(Text key, Iterable<ShoppingSumBean> values, Reducer<Text, ShoppingSumBean, Text, ShoppingSumBean>.Context context) throws IOException, InterruptedException {
// Traverse V2 aggregate , Get the three expenditure values of each element in the set , Then the three expenditures are accumulated respectively
Integer digitalSum = 0;
Integer commuteSum = 0;
Integer cateringSum = 0;
for (ShoppingSumBean shoppingSumBean : values) {
digitalSum += shoppingSumBean.getDigital();
commuteSum += shoppingSumBean.getcommute();
cateringSum += shoppingSumBean.getCatering();
}
// establish ShoppingSumBean object , Assign the sum of the three expenses calculated above to ShoppingSumBean, You can get V3
ShoppingSumBean shoppingSumBean = new ShoppingSumBean();
shoppingSumBean.setDigital(digitalSum);
shoppingSumBean.setcommute(commuteSum);
shoppingSumBean.setCatering(cateringSum);
// take K3,V3 write in context object
context.write(key, shoppingSumBean);
}
}
- Custom main class
package wyh.test.shopping;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ShppingJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "test_shopping_job");
job.setJarByClass(ShppingJobMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_shopping_input"));
job.setMapperClass(ShoppingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ShoppingSumBean.class);
job.setReducerClass(ShoppingReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ShoppingSumBean.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_shopping_output"));
boolean status = job.waitForCompletion(true);
return status?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// start-up job Mission
int runStatus = ToolRunner.run(configuration, new ShppingJobMain(), args);
System.exit(runStatus);
}
}
- pack

- take project jar Upload to the server and run jar

[[email protected] test_jar]# hadoop jar mapreduce_shopping-1.0-SNAPSHOT.jar wyh.test.shopping.ShppingJobMain
- View the run results

[[email protected] test_jar]# hdfs dfs -cat /test_shopping_output/part-r-00000
15525535555 4318 1128 4874
16626636666 2548 984 5437
It can be seen that the calculation result is consistent with our expected result .
This is a simple implementation MapReduce The case of aggregation and summation in .
边栏推荐
- 竟然還有人說ArrayList是2倍擴容,今天帶你手撕ArrayList源碼
- Live broadcast preview | deconstruct OLAP! The new multidimensional analysis architecture paradigm is fully open! Apache Doris will bring five big issues!
- Google x open source grabbing manipulator can find the target part at a glance without manual marking [turn]
- 姿态估计之2D人体姿态估计 - Distribution Aware Coordinate Representation for Human Pose Estimation【转-修改】
- 排名前十、手续费低的期货公司有哪些?安全吗
- How does the wireless communication module help the intelligent UAV build the "Internet of things in the air"?
- 【对象】。
- MapReduce之分区案例的代码实现
- kubernetes 二进制安装(v1.20.16)(五)验证 master 部署
- Rk3308-- firmware compilation
猜你喜欢

MapReduce案例之排序

Anba cv2fs/cv22fs obtained ASIL C chip function safety certification, surpassing the level of similar chips in the market

2290. Minimum Obstacle Removal to Reach Corner

广和通携手中国移动、惠普、联发科、英特尔合作打造5G全互联PC泛终端系列产品
![[untitled] audio Bluetooth voice chip, wt2605c-32n real-time recording upload technical scheme introduction](/img/13/9674d685dfa0b62d1d1fa5362a4c7e.png)
[untitled] audio Bluetooth voice chip, wt2605c-32n real-time recording upload technical scheme introduction

影刀RPA学习和遇见excel部分问题解决方式

姿态估计之2D人体姿态估计 - SimDR: Is 2D Heatmap Representation Even Necessary for Human Pose Estimation?

RK3308--固件编译

云图说|每个成功的业务系统都离不开APIG的保驾护航

点击解锁广和通5G模组“关键词”
随机推荐
Rk3308--8 channels changed to dual channels + recording gain
姿态估计之2D人体姿态估计 - Simple Baseline(SBL)
RK3308 按键Key与LED灯
How the terminator sets the font to display different colors
ORB_SLAM2视觉惯性紧耦合定位技术路线与代码详解1——IMU流型预积分
直播預告 | 解構OLAP!新型多維分析架構範式全公開!Apache Doris 將帶來五個重磅議題!
竟然還有人說ArrayList是2倍擴容,今天帶你手撕ArrayList源碼
MySQL8安装详细步骤
SQL language
Necessary tools for automatic operation and maintenance shell script introduction
[MySQL basics]
剑指 Offer 06. 从尾到头打印链表
【历史上的今天】6 月 10 日:Apple II 问世;微软收购 GECAD;发明“软件工程”一词的科技先驱出生
初学pytorch踩坑
MapReduce之Word Count案例代码实现
torch. nn. utils. rnn. pad_ Detailed explanation of sequence() [pytoch getting started manual]
视觉SLAM常见的QR分解SVD分解等矩阵分解方式求解满秩和亏秩最小二乘问题(最全的方法分析总结)
The ultimate buff of smart grid - guanghetong module runs through the whole process of "generation, transmission, transformation, distribution and utilization"
Methods commonly used in uniapp (part) - timestamp problem and rich text parsing image problem
How to write a global notice component?