当前位置:网站首页>Aggregate sum of MapReduce cases

Aggregate sum of MapReduce cases

2022-06-10 16:00:00 QYHuiiQ

In this case , What we want to achieve is to count each user ( Take the mobile phone number as the user id ) In the past few months, we have been working on digital 、 Travel 、 The sum of food and beverage expenses , That is to sum the monthly digital expenditure , Sum of travel expenses , Food and beverage , At last, it comes to 3 It's worth .

  • Prepare the data

  Upload the data to the server :

[[email protected] test_data]# cat test_shopping.txt
15525535555,03,2890,437,1495
16626636666,05,1264,308,1677
16626636666,03,530,259,2174
15525535555,05,1088,372,1726
16626636666,04,754,417,1586

Upload data files to HDFS:

[[email protected] test_jar]# hdfs dfs -mkdir /test_shopping_input
[[email protected] test_jar]# cd ../
[[email protected] hadoop-2.7.5]# cd ../test_data/
[[email protected] test_data]# hdfs dfs -put test_shopping.txt /test_shopping_input

newly build project:

  • introduce pom rely on
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wyh.test</groupId>
    <artifactId>mapreduce_shopping</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
  • Create custom Bean Class encapsulation three expenses
package wyh.test.shopping;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// Realization Writable Interface so that ShoppinngSumBean Serialization and deserialization can be realized 
public class ShoppinngSumBean implements Writable {

    private Integer digital;// Digital spending 
    private Integer commute;// Travel expenses 
    private Integer catering;// Catering expenses 

    public Integer getDigital() {
        return digital;
    }

    public void setDigital(Integer digital) {
        this.digital = digital;
    }

    public Integer getcommute() {
        return commute;
    }

    public void setcommute(Integer commute) {
        this.commute = commute;
    }

    public Integer getCatering() {
        return catering;
    }

    public void setCatering(Integer catering) {
        this.catering = catering;
    }

    @Override
    public String toString() {
        return digital +
                "\t" + commute +
                "\t" + catering;
    }

    // Realization Writable Interface 
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(digital);
        dataOutput.writeInt(commute);
        dataOutput.writeInt(catering);
    }

    // Realization Writable Interface 
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.digital = dataInput.readInt();
        this.commute = dataInput.readInt();
        this.catering = dataInput.readInt();
    }
}
  • Customize Mapper
package wyh.test.shopping;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


/**
 * K1 Is the row offset 
 * V1 For row data 
 * K2 Identify the user ( cell-phone number )
 * V2 by ShoppingSumBean( The number in each row of data 、 Travel 、 The object encapsulated by the catering expenditure value )
 */
public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingSumBean> {

    /**
     *  K1               V1
     *  0                15525535555,03,2890,437,1495
     *  K2               V2
     *  15525535555      ShoppingSumBean(2890,437,1495)
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Split row data , Cut out the mobile phone number ( User ID ), You can get K2
        String[] splitString = value.toString().split(",");
        String userPhone = splitString[0];
        // establish ShoppingSumBean object , And extract three expenditure values from the row data , Assign a value to ShoppingSumBean object , You can get V2
        ShoppingSumBean shoppingSumBean = new ShoppingSumBean();
        shoppingSumBean.setDigital(Integer.parseInt(splitString[2]));
        shoppingSumBean.setcommute(Integer.parseInt(splitString[3]));
        shoppingSumBean.setCatering(Integer.parseInt(splitString[4]));
        // take K2,V2 write in context In the object 
        context.write(new Text(userPhone), shoppingSumBean);
    }
}
  • Customize Reducer class
package wyh.test.shopping;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

/**
 * K2               V2( after shuffle After the phase , same K2 Multiple pieces of data will be placed in a set )
 * 15525535555      ShoppingSumBean(2890,437,1495),ShoppingSumBean(1088,372,1726),ShoppingSumBean(340,319,1653)
 * K3               V3( Add the three expenses of multiple records of a unified user respectively , Get the sum of each expenditure separately , And still encapsulated as ShoppingSumBean)
 * 15525535555      ShoppingSumBean(4318,1128,4874)
 */
public class ShoppingReducer extends Reducer<Text, ShoppingSumBean, Text, ShoppingSumBean> {
    @Override
    protected void reduce(Text key, Iterable<ShoppingSumBean> values, Reducer<Text, ShoppingSumBean, Text, ShoppingSumBean>.Context context) throws IOException, InterruptedException {

        // Traverse V2 aggregate , Get the three expenditure values of each element in the set , Then the three expenditures are accumulated respectively 
        Integer digitalSum = 0;
        Integer commuteSum = 0;
        Integer cateringSum = 0;
        for (ShoppingSumBean shoppingSumBean : values) {
            digitalSum += shoppingSumBean.getDigital();
            commuteSum += shoppingSumBean.getcommute();
            cateringSum += shoppingSumBean.getCatering();
        }
        // establish ShoppingSumBean object , Assign the sum of the three expenses calculated above to ShoppingSumBean, You can get V3
        ShoppingSumBean shoppingSumBean = new ShoppingSumBean();
        shoppingSumBean.setDigital(digitalSum);
        shoppingSumBean.setcommute(commuteSum);
        shoppingSumBean.setCatering(cateringSum);
        // take K3,V3 write in context object 
        context.write(key, shoppingSumBean);

    }
}
  • Custom main class
package wyh.test.shopping;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ShppingJobMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), "test_shopping_job");
        job.setJarByClass(ShppingJobMain.class);
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_shopping_input"));
        job.setMapperClass(ShoppingMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(ShoppingSumBean.class);
        job.setReducerClass(ShoppingReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(ShoppingSumBean.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_shopping_output"));
        boolean status = job.waitForCompletion(true);
        return status?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // start-up job Mission 
        int runStatus = ToolRunner.run(configuration, new ShppingJobMain(), args);
        System.exit(runStatus);
    }
}
  • pack

  •   take project jar Upload to the server and run jar

[[email protected] test_jar]# hadoop jar mapreduce_shopping-1.0-SNAPSHOT.jar wyh.test.shopping.ShppingJobMain
  • View the run results

[[email protected] test_jar]# hdfs dfs -cat /test_shopping_output/part-r-00000
15525535555     4318    1128    4874
16626636666     2548    984     5437

It can be seen that the calculation result is consistent with our expected result .

This is a simple implementation MapReduce The case of aggregation and summation in .

原网站

版权声明
本文为[QYHuiiQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101527452056.html