当前位置:网站首页>MapReduce instance (VIII): Map end join
MapReduce instance (VIII): Map end join
2022-07-06 09:33:00 【Laugh at Fengyun Road】
MR Realization Map End join
Hello everyone , I am Fengyun , Welcome to my blog perhaps WeChat official account 【 Laugh at Fengyun Road 】, In the days to come, let's learn about big data related technologies , Work hard together , Meet a better self !
Use scenarios and principles
Map End join It means that the data reaches map Merge before processing functions , Efficiency is much higher than Reduce End join, because Reduce End join Is to pass all the data Shuffle, Very resource intensive .
- Map End join Usage scenarios of :
- The data in a table is very small 、 A table has a large amount of data
- principle :
- Map End join It is an optimization for the above scenarios : Load all the data in the small table into memory , Press keyword Index . The data in the large table is used as map The input of , Yes map() Function each pair <key,value> Input , You can easily connect with small data that has been loaded into memory . Press the connection result key Output , after shuffle Stage , To reduce The end is already pressed key Grouped and connected data .
Realize the idea
- First, when submitting the job, put the small table file in the DistributedCache in , And then from DistributedCache Take out the small table for join Connected <key,value> Key value pair , Split it into memory ( Can be placed HashMap In a container )
- To rewrite MyMapper Class setup() Method , Because this method precedes map Method execution , Read the smaller table into a HashMap in .
- rewrite map function , Read the contents of the large table row by row , One by one and HashMap Compare the contents of , if Key identical , Then format the data , And then directly output .
- map Function output <key,value> Key value pairs first pass shuffle hold key All with the same value value Put it into an iterator to form values, And then <key,values> Key value pairs are passed to reduce function ,reduce Function input key Copy directly to the output key, Input values Through enhanced version for Loop through output one by one , The number of cycles determines <key,value> Number of outputs .
Code writing
Mapper Code
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
This part is divided into setup Methods and map Method . stay setup In this method, we first use getName() Get the current file name orders1 And assign it to fileName, And then use bufferedReader Read the cache file in memory . Use readLine() Method to read each line of record , Use this record split(“\t”) Method interception , And order_items The same fields in the file str[0] As key Values in map aggregate dict in , Select the field to be displayed as value.map Function receive order_items File data , And use split(“\t”) The intercepted data is stored in the array kv[] in ( among kv[1] And str[0] The fields represented are the same ), use if Judge , If in memory dict A collection of key Value inclusion kv[1], Then use context Of write() Method output key2/value2 value , among kv[1] As key2, other dict.get(kv[1])+“\t”+kv[2] As value2.
Reduce Code
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
map Function output <key,value > The key value pair first passes through a suffle hold key All with the same value value Put it into an iterator to form values, And then <key,values> Key value pairs are passed to reduce function ,reduce Function input key Copy directly to the output key, Input values Through enhanced version for Loop through output one by one .
Complete code
package mapreduce;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
//System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Job job = Job.getInstance();
job.setJobName("mapjoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/mymapreduce5/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce5/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
URI uri = new URI("hdfs://localhost:9000/mymapreduce5/in/orders1");
job.addCacheFile(uri);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
WeChat official account : Below scan QR code
or Search for Laugh at Fengyun Road
Focus on
边栏推荐
- Withdrawal of wechat applet (enterprise payment to change)
- Redis之哨兵模式
- 【shell脚本】——归档文件脚本
- [Yu Yue education] Wuhan University of science and technology securities investment reference
- MapReduce instance (V): secondary sorting
- Redis之五大基础数据结构深入、应用场景
- Heap (priority queue) topic
- 【shell脚本】使用菜单命令构建在集群内创建文件夹的脚本
- CAP理论
- Oom happened. Do you know the reason and how to solve it?
猜你喜欢
Redis cluster
Mapreduce实例(六):倒排索引
The five basic data structures of redis are in-depth and application scenarios
Reids之缓存预热、雪崩、穿透
Pytest parameterization some tips you don't know / pytest you don't know
Oom happened. Do you know the reason and how to solve it?
Different data-driven code executes the same test scenario
基于B/S的网上零食销售系统的设计与实现(附:源码 论文 Sql文件)
Mapreduce实例(八):Map端join
Mathematical modeling 2004b question (transmission problem)
随机推荐
[Yu Yue education] reference materials of power electronics technology of Jiangxi University of science and technology
Global and Chinese market of metallized flexible packaging 2022-2028: Research Report on technology, participants, trends, market size and share
Redis cluster
One article read, DDD landing database design practice
In depth analysis and encapsulation call of requests
Redis分布式锁实现Redisson 15问
Global and Chinese market of cup masks 2022-2028: Research Report on technology, participants, trends, market size and share
YARN组织架构
Kratos ares microservice framework (II)
Global and Chinese markets for hardware based encryption 2022-2028: Research Report on technology, participants, trends, market size and share
Sentinel mode of redis
Opencv+dlib realizes "matching" glasses for Mona Lisa
工作流—activiti7环境搭建
The carousel component of ant design calls prev and next methods in TS (typescript) environment
Kratos ares microservice framework (I)
Kratos战神微服务框架(三)
Design and implementation of online shopping system based on Web (attached: source code paper SQL file)
Pytest parameterization some tips you don't know / pytest you don't know
Basic concepts of libuv
基于WEB的网上购物系统的设计与实现(附:源码 论文 sql文件)