当前位置:网站首页>Mapreduce实例(八):Map端join
Mapreduce实例(八):Map端join
2022-07-06 09:01:00 【笑看风云路】
大家好,我是风云,欢迎大家关注我的博客 或者 微信公众号【笑看风云路】,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!
使用场景和原理
Map端join是指数据达到map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
- Map端join的使用场景:
- 一张表数据十分小、一张表数据很大
- 原理:
- Map端join是针对以上场景进行的优化:将小表中的数据全部加载到内存,按关键字建立索引。大表中的数据作为map的输入,对map()函数每一对<key,value>输入,就能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,到reduce端的就是已经按key分组并且连接好了的数据。
实现思路
- 首先在提交作业的时候将小表文件放到该作业的DistributedCache中,然后从DistributedCache中取出该小表进行join连接的<key,value>键值对,将其分割放到内存中(可以放到HashMap等容器中)
- 要重写MyMapper类下面的setup()方法,因为这个方法是先于map方法执行的,将较小表先读入到一个HashMap中。
- 重写map函数,一行行读入大表的内容,逐一与HashMap中的内容进行比较,若Key相同,则对数据进行格式化处理,然后直接输出。
- map函数输出的<key,value>键值对首先经过shuffle把key值相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数,reduce函数输入的key直接复制给输出的key,输入的values通过增强版for循环遍历逐一输出,循环的次数决定了<key,value>输出的次数。
代码编写
Mapper代码
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
该部分分为setup方法与map方法。在setup方法中首先用getName()获取当前文件名为orders1的文件并赋值给fileName,然后用bufferedReader读取内存中缓存文件。在读文件时用readLine()方法读取每行记录,把该记录用split(“\t”)方法截取,与order_items文件中相同的字段str[0]作为key值放到map集合dict中,选取所要展现的字段作为value。map函数接收order_items文件数据,并用split(“\t”)截取数据存放到数组kv[]中(其中kv[1]与str[0]代表的字段相同),用if判断,如果内存中dict集合的key值包含kv[1],则用context的write()方法输出key2/value2值,其中kv[1]作为key2,其他dict.get(kv[1])+“\t”+kv[2]作为value2。
Reduce代码
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
map函数输出的<key,value >键值对首先经过一个suffle把key值相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数,reduce函数输入的key直接复制给输出的key,输入的values通过增强版for循环遍历逐一输出。
完整代码
package mapreduce;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
//System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Job job = Job.getInstance();
job.setJobName("mapjoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/mymapreduce5/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce5/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
URI uri = new URI("hdfs://localhost:9000/mymapreduce5/in/orders1");
job.addCacheFile(uri);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
微信公众号:扫描下方二维码或 搜索 笑看风云路 关注
边栏推荐
- Mise en œuvre de la quantification post - formation du bminf
- CSP salary calculation
- Digital people anchor 618 sign language with goods, convenient for 27.8 million people with hearing impairment
- Post training quantification of bminf
- Advanced Computer Network Review(4)——Congestion Control of MPTCP
- Redis之Lua脚本
- [OC foundation framework] - [set array]
- CUDA implementation of self defined convolution attention operator
- Implement window blocking on QWidget
- [three storage methods of graph] just use adjacency matrix to go out
猜你喜欢

甘肃旅游产品预订增四倍:“绿马”走红,甘肃博物馆周边民宿一房难求

Reids之删除策略

Redis' bitmap

Redis之主从复制

Chapter 1 :Application of Artificial intelligence in Drug Design:Opportunity and Challenges

数据建模有哪些模型

Persistence practice of redis (Linux version)

Redis之性能指标、监控方式

Intel distiller Toolkit - Quantitative implementation 3

Mathematical modeling 2004b question (transmission problem)
随机推荐
Sqlmap installation tutorial and problem explanation under Windows Environment -- "sqlmap installation | CSDN creation punch in"
Redis之核心配置
Use of activiti7 workflow
Global and Chinese market for annunciator panels 2022-2028: Research Report on technology, participants, trends, market size and share
Improved deep embedded clustering with local structure preservation (Idec)
Global and Chinese market of linear regulators 2022-2028: Research Report on technology, participants, trends, market size and share
In depth analysis and encapsulation call of requests
Pytest参数化你不知道的一些使用技巧 /你不知道的pytest
Different data-driven code executes the same test scenario
面渣逆袭:Redis连环五十二问,图文详解,这下面试稳了
Redis之五大基础数据结构深入、应用场景
Withdrawal of wechat applet (enterprise payment to change)
软件负载均衡和硬件负载均衡的选择
KDD 2022 paper collection (under continuous update)
Redis之主从复制
Persistence practice of redis (Linux version)
Global and Chinese markets of SERS substrates 2022-2028: Research Report on technology, participants, trends, market size and share
Meituan Er Mian: why does redis have sentinels?
Connexion d'initialisation pour go redis
Global and Chinese market of cup masks 2022-2028: Research Report on technology, participants, trends, market size and share