当前位置:网站首页>Join operation cases in the map phase of MapReduce
Join operation cases in the map phase of MapReduce
2022-06-10 16:00:00 【QYHuiiQ】
In this case, what we want to achieve is Map Terminal join, Usually, we will put small tables in the distributed cache , Then pass the big watch through the normal map Stage and small table join, Get one join Later results , such map The result is what we want , Then there is no need for reduce 了 , So in this case , We just need to customize Mapper And the main class .
Follow the two tables in the previous case , We take the student basic information table as a small table , Make the grade sheet a big one .
- Prepare the data

First put the data file into HDFS in :
[[email protected] test_data]# hdfs dfs -mkdir /test_distributed_cache
[[email protected] test_data]# hdfs dfs -put map* /test_distributed_cache

We need to put the data file of the small table into the distributed cache when the program starts , This step needs to be configured in the main class .
newly build project:
- introduce pom rely on
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>mapreduce_map_join</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- Customize Mapper
package wyh.test.map_join;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* map() Each data read will be executed once , The data in the distributed cache can be read only once ,
* Therefore, reading the data in the distributed cache does not need to be placed in map() To realize ,setup() Methods are executed only once , So we are setup() To realize
*/
// establish Map Collection is used to store the contents of files in the distributed cache
private HashMap<String, String> map = new HashMap<>();
// Read the files in the distributed cache
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// There are multiple files in the distributed cache , So you need to get the file list in the distributed cache first
URI[] cacheFiles = context.getCacheFiles();
// Get the file system of the distributed cache file we want . At present, there is only one file in our distributed cache file , So we just take it [0] Element is the file system we want
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
// Get the input stream of the file
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
// Stream byte input into character buffer stream ( That is, when reading a file , It was originally a byte stream , It can be read line by line by converting to a character buffer stream )
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);// Byte flow to character flow
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);// The character flow is a buffer stream
// Get the character buffer stream , You can read the contents of the file in behavioral units , And put the contents of the file in Map Collection
String line = null;
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
// Here, we take the student number, the first element of each row of data after segmentation, as the key, Line content as value
map.put(split[0], line);
}
// Closed flow
bufferedReader.close();
fileSystem.close();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// From large table ( League tables ) Get the student number as K2
String[] split = value.toString().split(",");
String studentId = split[1];
// from map According to key Get the corresponding value , That is, get the small table through the student number ( Student list ) Information in
String studentInfo = map.get(studentId);
// Put the... Of the two tables value It's worth stitching together
String mergeValue = value.toString() + " # " + studentInfo;
// take K2,V2 write in context
context.write(new Text(studentId), new Text(mergeValue));
}
}
- Custom main class
package wyh.test.map_join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class MapJoinMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "test_map_join_job");
job.setJarByClass(MapJoinMain.class);
// Load the small table into the distributed cache
job.addCacheFile(new URI("hdfs://192.168.126.132:8020/test_distributed_cache/map_join_student_info.txt"));
job.setInputFormatClass(TextInputFormat.class);
// The setting here is map Path of medium and large tables , Small and large tables can be placed in different directories , Here I put it in the same directory
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_distributed_cache/map_join_student_grade.txt"));
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
/**
* because map The stage has join 了 , It is already the result we want , Unwanted reduce Stage special treatment , So we don't have customization here Reducer
*/
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_map_join_output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// Read in cluster hdfs Disable caching on file
configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
int run = ToolRunner.run(configuration, new MapJoinMain(), args);
System.exit(run);
}
}
- pack jar And upload it to the server

- function jar
[[email protected] test_jar]# hadoop jar mapreduce_map_join-1.0-SNAPSHOT.jar wyh.test.map_join.MapJoinMain
- View output

[[email protected] test_jar]# hdfs dfs -cat /test_map_join_output/part-r-00000

In this way, it is easy to realize MapReduce in Map Terminal join operation .
边栏推荐
- Click to unlock "keyword" of guanghetong 5g module
- Vins theory and code explanation 0 -- theoretical basis in vernacular
- ORB_ Slam2 visual inertial tight coupling positioning technology route and code explanation 0 - overall framework and theoretical basic knowledge
- 2D pose estimation for pose estimation - (openpose) realtime multi person 2D pose estimation using part affinity fields
- Methods commonly used in uniapp (part) - timestamp problem and rich text parsing image problem
- Save a window with a specific size, resolution, or background color
- “绽放杯”5G应用奖项大满贯!广和通多个联合项目荣获通用产品专题赛一、二、三等奖
- 姿态估计之2D人体姿态估计 - (OpenPose) Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields
- idea新建项目报错org.codehaus.plexus.component.repository.exception.ComponentLookupException:
- ORB_ Slam2 visual inertial tight coupling positioning technology route and code explanation 3 - tight coupling optimization model
猜你喜欢

uniapp中常用到的方法(部分) - 時間戳問題及富文本解析圖片問題

The product design software figma cannot be used. What software with similar functions is available in China

直播預告 | 解構OLAP!新型多維分析架構範式全公開!Apache Doris 將帶來五個重磅議題!
![[high code file format API] Shanghai daoning provides you with the file format API set Aspose, which can create, convert and operate more than 100 file formats in just a few lines of code](/img/43/086da4950da4c6423d5fc46e46b24f.png)
[high code file format API] Shanghai daoning provides you with the file format API set Aspose, which can create, convert and operate more than 100 file formats in just a few lines of code

Cube 技术解读 | Cube 渲染设计的前世今生

This and object prototypes

安霸CV2FS/CV22FS获得ASIL C芯片功能安全认证,超越市场同类芯片水平

MapReduce案例之多Map阶段求共同好友

Méthodes couramment utilisées dans uniapp - TIMESTAMP et Rich Text Analysis picture

直播预告 | 解构OLAP!新型多维分析架构范式全公开!Apache Doris 将带来五个重磅议题!
随机推荐
Tensorflow actual combat Google deep learning framework second edition learning summary tensorflow introduction
Summary of methods for point projection onto a plane
RK3308 按键Key与LED灯
Save a window with a specific size, resolution, or background color
What are the top ten futures companies with low handling fees? Is it safe?
姿态估计之2D人体姿态估计 - Numerical Coordinate Regression with Convolutional Neural Networks(DSNT)
影刀RPA学习和遇见excel部分问题解决方式
MapReduce案例之排序
Interpretation of cube technology | past and present life of cube Rendering Design
Méthodes couramment utilisées dans uniapp - TIMESTAMP et Rich Text Analysis picture
Scope and closure
torch. utils. data. Dataloader() details [pytoch getting started manual]
2290. Minimum Obstacle Removal to Reach Corner
uniapp中常用到的方法(部分) - 时间戳问题及富文本解析图片问题
“绽放杯”5G应用奖项大满贯!广和通多个联合项目荣获通用产品专题赛一、二、三等奖
HKU and NVIDIA | factuality enhanced language models for open ended text generation
Use of hardware instruments
排名前十、手续费低的期货公司有哪些?安全吗
剑指 Offer 06. 从尾到头打印链表
idea新建项目报错org.codehaus.plexus.component.repository.exception.ComponentLookupException: