当前位置:网站首页>Join operation cases in the map phase of MapReduce

Join operation cases in the map phase of MapReduce

2022-06-10 16:00:00 QYHuiiQ

In this case, what we want to achieve is Map Terminal join, Usually, we will put small tables in the distributed cache , Then pass the big watch through the normal map Stage and small table join, Get one join Later results , such map The result is what we want , Then there is no need for reduce 了 , So in this case , We just need to customize Mapper And the main class .

Follow the two tables in the previous case , We take the student basic information table as a small table , Make the grade sheet a big one .

  • Prepare the data

First put the data file into HDFS in :

[[email protected] test_data]# hdfs dfs -mkdir /test_distributed_cache
[[email protected] test_data]# hdfs dfs -put map* /test_distributed_cache

  We need to put the data file of the small table into the distributed cache when the program starts , This step needs to be configured in the main class .

newly build project:

  • introduce pom rely on
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wyh.test</groupId>
    <artifactId>mapreduce_map_join</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
  • Customize Mapper
package wyh.test.map_join;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    /**
     *     map() Each data read will be executed once , The data in the distributed cache can be read only once ,
     *      Therefore, reading the data in the distributed cache does not need to be placed in map() To realize ,setup() Methods are executed only once , So we are setup() To realize 
     */

    // establish Map Collection is used to store the contents of files in the distributed cache 
    private HashMap<String, String> map = new HashMap<>();

    // Read the files in the distributed cache 
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // There are multiple files in the distributed cache , So you need to get the file list in the distributed cache first 
        URI[] cacheFiles = context.getCacheFiles();
        // Get the file system of the distributed cache file we want . At present, there is only one file in our distributed cache file , So we just take it [0] Element is the file system we want 
        FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
        // Get the input stream of the file 
        FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
        // Stream byte input into character buffer stream ( That is, when reading a file , It was originally a byte stream , It can be read line by line by converting to a character buffer stream )
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);// Byte flow to character flow 
        BufferedReader bufferedReader = new BufferedReader(inputStreamReader);// The character flow is a buffer stream 
        // Get the character buffer stream , You can read the contents of the file in behavioral units , And put the contents of the file in Map Collection 
        String line = null;
        while ((line = bufferedReader.readLine())!=null){
            String[] split = line.split(",");
            // Here, we take the student number, the first element of each row of data after segmentation, as the key, Line content as value
            map.put(split[0], line);
        }
        // Closed flow 
        bufferedReader.close();
        fileSystem.close();
    }



    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // From large table ( League tables ) Get the student number as K2
        String[] split = value.toString().split(",");
        String studentId = split[1];
        // from map According to key Get the corresponding value , That is, get the small table through the student number ( Student list ) Information in 
        String studentInfo = map.get(studentId);
        // Put the... Of the two tables value It's worth stitching together 
        String mergeValue = value.toString() + " # " + studentInfo;
        // take K2,V2 write in context
        context.write(new Text(studentId), new Text(mergeValue));

    }
}
  • Custom main class
package wyh.test.map_join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class MapJoinMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), "test_map_join_job");
        job.setJarByClass(MapJoinMain.class);
        // Load the small table into the distributed cache 
        job.addCacheFile(new URI("hdfs://192.168.126.132:8020/test_distributed_cache/map_join_student_info.txt"));
        job.setInputFormatClass(TextInputFormat.class);
        // The setting here is map Path of medium and large tables , Small and large tables can be placed in different directories , Here I put it in the same directory 
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_distributed_cache/map_join_student_grade.txt"));
        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        /**
         *  because map The stage has join 了 , It is already the result we want , Unwanted reduce Stage special treatment , So we don't have customization here Reducer
         */
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_map_join_output"));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // Read in cluster hdfs Disable caching on file 
        configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
        int run = ToolRunner.run(configuration, new MapJoinMain(), args);
        System.exit(run);
    }
}
  • pack jar And upload it to the server

  •   function jar
[[email protected] test_jar]# hadoop jar mapreduce_map_join-1.0-SNAPSHOT.jar wyh.test.map_join.MapJoinMain
  • View output

[[email protected] test_jar]# hdfs dfs -cat /test_map_join_output/part-r-00000

  In this way, it is easy to realize MapReduce in Map Terminal join operation .

原网站

版权声明
本文为[QYHuiiQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101527451582.html