当前位置:网站首页>MapReduce之Map阶段的join操作案例
MapReduce之Map阶段的join操作案例
2022-06-10 15:29:00 【QYHuiiQ】
在这个案例中我们要实现的是Map端的join,通常情况下我们会将小表放在分布式缓存中,然后将大表通过正常的map阶段与小表进行join,得到一个join之后的结果,这样map之后的结果就是我们想要的结果,那么就不需要reduce了,所以在该案例中,我们只需要自定义Mapper和主类即可。
沿用上一个案例中的两个表,我们把学生基本信息表作为小表,把成绩表作为大表。
- 准备数据

先将数据文件放入HDFS中:
[[email protected] test_data]# hdfs dfs -mkdir /test_distributed_cache
[[email protected] test_data]# hdfs dfs -put map* /test_distributed_cache

我们需要在程序启动时就将小表的数据文件放入分布式缓存中,这一步骤需要在主类中配置才能实现。
新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>mapreduce_map_join</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 自定义Mapper
package wyh.test.map_join;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* map()是每条数据读取的时候都会执行一次的,而读取分布式缓存中的数据是只需要读取一次就行的,
* 所以读取分布式缓存中的数据不需要放在map()中实现,setup()方法是只用执行一次的,所以我们在setup()中实现
*/
//创建Map集合用来存放分布式缓存中文件的内容
private HashMap<String, String> map = new HashMap<>();
//读取分布式缓存中的文件
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
//分布式缓存中存在多个文件,所以需要先获取分布式缓存中的文件列表
URI[] cacheFiles = context.getCacheFiles();
//获取我们想要的分布式缓存文件的文件系统。由于目前我们的分布式缓存文件中只有一个文件,所以我们就是直接拿[0]元素就是我们想要的文件系统
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
//得到文件的输入流
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
//将字节输入流转为字符缓冲流(即读取文件时,原本是字节流,转为字符缓冲流就可以一行一行地读取)
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);//字节流转为字符流
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);//字符流转为缓冲流
//拿到字符缓冲流后,就可以以行为单位读取文件内容,并将文件内容放在Map集合中
String line = null;
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
//这里我们将每一行数据分割后的第一个元素也就是学生编号作为集合中的key,行内容作为value
map.put(split[0], line);
}
//关闭流
bufferedReader.close();
fileSystem.close();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
//从大表(成绩表)中拿到学生编号作为K2
String[] split = value.toString().split(",");
String studentId = split[1];
//从map集合中根据key获取对应的值,即通过学生编号拿到小表(学生表)中的信息
String studentInfo = map.get(studentId);
//将两个表的value值拼接在一起
String mergeValue = value.toString() + " # " + studentInfo;
//将K2,V2写入context
context.write(new Text(studentId), new Text(mergeValue));
}
}
- 自定义主类
package wyh.test.map_join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class MapJoinMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "test_map_join_job");
job.setJarByClass(MapJoinMain.class);
//将小表加载到分布式缓存中
job.addCacheFile(new URI("hdfs://192.168.126.132:8020/test_distributed_cache/map_join_student_info.txt"));
job.setInputFormatClass(TextInputFormat.class);
//这里设置的是map中大表的路径,小表和大表可以放在不同的目录下,这里我就放在同一个目录下了
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_distributed_cache/map_join_student_grade.txt"));
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
/**
* 由于map阶段已经join了,已经是我们想要的结果了,不需要reduce阶段特殊处理,所以这里我们就没有自定义Reducer
*/
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_map_join_output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//集群中读取hdfs文件时禁用缓存
configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
int run = ToolRunner.run(configuration, new MapJoinMain(), args);
System.exit(run);
}
}
- 打包jar并上传至服务器

- 运行jar
[[email protected] test_jar]# hadoop jar mapreduce_map_join-1.0-SNAPSHOT.jar wyh.test.map_join.MapJoinMain
- 查看输出结果

[[email protected] test_jar]# hdfs dfs -cat /test_map_join_output/part-r-00000

这样就简单地实现了MapReduce中Map端的join操作。
边栏推荐
- How the terminator sets the font to display different colors
- 百度开源ICE-BA安装运行总结
- 探索数据可视化开发平台FlyFish开源背后的秘密!
- 农产品期货如何开户?有没有什么资金条件?
- ORB_ Slam2 visual inertial tight coupling positioning technology route and code explanation 0 - overall framework and theoretical basic knowledge
- 影刀RPA学习和遇见excel部分问题解决方式
- Vins theory and code explanation 4 - initialization
- 姿态估计之2D人体姿态估计 - Human Pose Regression with Residual Log-likelihood Estimation(RLE)[仅链接]
- Jiabo gp2120tu label printer installation and use tutorial (PC)
- 产品设计软件Figma用不了,国内有哪些相似功能的软件
猜你喜欢

VINS理论与代码详解0——理论基础白话篇

Several reasons and solutions of virtual machine Ping failure

Tensorflow actual combat Google deep learning framework second edition learning summary tensorflow introduction

Beginner pytorch step pit

音视频处理三剑客之 AEC:回声产生原因及回声消除原理

统一认证中心 Oauth2 认证坑

HKU and NVIDIA | factuality enhanced language models for open ended text generation

【高代码文件格式API】上海道宁为您提供文件格式API集——Aspose,只需几行代码即可创建转换和操作100多种文件格式

姿态估计之2D人体姿态估计 - Numerical Coordinate Regression with Convolutional Neural Networks(DSNT)

Recommend an easy-to-use designer navigation website
随机推荐
顺应医改,积极布局——集采背景下的高值医用耗材发展洞察2022
产品设计软件Figma用不了,国内有哪些相似功能的软件
2290. Minimum Obstacle Removal to Reach Corner
CentOS Linux is dead! Oracle Linux may be a better alternative
Quelqu'un a même dit que ArrayList était deux fois plus grand. Aujourd'hui, je vais vous montrer le code source ArrayList.
【高代码文件格式API】上海道宁为您提供文件格式API集——Aspose,只需几行代码即可创建转换和操作100多种文件格式
Google Earth engine (GEE) - real time global 10 meter land use / land cover (LULC) data set based on S2 images
Kubernetes 1.24: avoid conflicts when assigning IP addresses to services
The power of insight
Unified certification center oauth2 certification pit
Yuntu says that every successful business system cannot be separated from apig
Common QR decomposition, SVD decomposition and other matrix decomposition methods of visual slam to solve full rank and deficient rank least squares problems (analysis and summary of the most complete
Vins theory and code explanation 4 - initialization
Technology sharing | quick intercom, global intercom
Méthodes couramment utilisées dans uniapp - TIMESTAMP et Rich Text Analysis picture
【MySQL基础】
How the terminator sets the font to display different colors
Explain the opencv function filter2d() in detail and remind you that the operation it does is not convolution but correlation operation
数字化管理中台+低代码,JNPF开启企业数字化转型的新引擎
Day10/11 recursion / backtracking