当前位置:网站首页>Mapreduce实例(八):Map端join
Mapreduce实例(八):Map端join
2022-07-06 09:01:00 【笑看风云路】
大家好,我是风云,欢迎大家关注我的博客 或者 微信公众号【笑看风云路】,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!
使用场景和原理
Map端join是指数据达到map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
- Map端join的使用场景:
- 一张表数据十分小、一张表数据很大
- 原理:
- Map端join是针对以上场景进行的优化:将小表中的数据全部加载到内存,按关键字建立索引。大表中的数据作为map的输入,对map()函数每一对<key,value>输入,就能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,到reduce端的就是已经按key分组并且连接好了的数据。
实现思路
- 首先在提交作业的时候将小表文件放到该作业的DistributedCache中,然后从DistributedCache中取出该小表进行join连接的<key,value>键值对,将其分割放到内存中(可以放到HashMap等容器中)
- 要重写MyMapper类下面的setup()方法,因为这个方法是先于map方法执行的,将较小表先读入到一个HashMap中。
- 重写map函数,一行行读入大表的内容,逐一与HashMap中的内容进行比较,若Key相同,则对数据进行格式化处理,然后直接输出。
- map函数输出的<key,value>键值对首先经过shuffle把key值相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数,reduce函数输入的key直接复制给输出的key,输入的values通过增强版for循环遍历逐一输出,循环的次数决定了<key,value>输出的次数。
代码编写
Mapper代码
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
该部分分为setup方法与map方法。在setup方法中首先用getName()获取当前文件名为orders1的文件并赋值给fileName,然后用bufferedReader读取内存中缓存文件。在读文件时用readLine()方法读取每行记录,把该记录用split(“\t”)方法截取,与order_items文件中相同的字段str[0]作为key值放到map集合dict中,选取所要展现的字段作为value。map函数接收order_items文件数据,并用split(“\t”)截取数据存放到数组kv[]中(其中kv[1]与str[0]代表的字段相同),用if判断,如果内存中dict集合的key值包含kv[1],则用context的write()方法输出key2/value2值,其中kv[1]作为key2,其他dict.get(kv[1])+“\t”+kv[2]作为value2。
Reduce代码
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
map函数输出的<key,value >键值对首先经过一个suffle把key值相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数,reduce函数输入的key直接复制给输出的key,输入的values通过增强版for循环遍历逐一输出。
完整代码
package mapreduce;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
//System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Job job = Job.getInstance();
job.setJobName("mapjoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/mymapreduce5/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce5/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
URI uri = new URI("hdfs://localhost:9000/mymapreduce5/in/orders1");
job.addCacheFile(uri);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
微信公众号:扫描下方二维码
或 搜索 笑看风云路
关注
边栏推荐
- Five layer network architecture
- 068.查找插入位置--二分查找
- 为什么要数据分层
- One article read, DDD landing database design practice
- [oc]- < getting started with UI> -- learning common controls
- 在QWidget上实现窗口阻塞
- Redis' performance indicators and monitoring methods
- Multivariate cluster analysis
- [shell script] use menu commands to build scripts for creating folders in the cluster
- Minio distributed file storage cluster for full stack development
猜你喜欢
Once you change the test steps, write all the code. Why not try yaml to realize data-driven?
Different data-driven code executes the same test scenario
什么是MySQL?MySql的学习之路是怎样的
Simclr: comparative learning in NLP
Redis core configuration
requests的深入刨析及封装调用
Post training quantification of bminf
Redis之发布订阅
Persistence practice of redis (Linux version)
Kratos战神微服务框架(一)
随机推荐
一文读懂,DDD落地数据库设计实战
Redis之Lua脚本
数据建模有哪些模型
leetcode-14. Longest common prefix JS longitudinal scanning method
Workflow - activiti7 environment setup
QML type: locale, date
Global and Chinese markets for small seed seeders 2022-2028: Research Report on technology, participants, trends, market size and share
CSP salary calculation
Master slave replication of redis
Redis' performance indicators and monitoring methods
Intel distiller Toolkit - Quantitative implementation 1
Global and Chinese market of cup masks 2022-2028: Research Report on technology, participants, trends, market size and share
Kratos ares microservice framework (I)
CUDA implementation of self defined convolution attention operator
Selenium+Pytest自动化测试框架实战(下)
[oc]- < getting started with UI> -- common controls - prompt dialog box and wait for the prompt (circle)
美团二面:为什么 Redis 会有哨兵?
Redis之哨兵模式
Global and Chinese markets for modular storage area network (SAN) solutions 2022-2028: Research Report on technology, participants, trends, market size and share
Multivariate cluster analysis