当前位置:网站首页>MapReduce instance (VIII): Map end join
MapReduce instance (VIII): Map end join
2022-07-06 09:33:00 【Laugh at Fengyun Road】
MR Realization Map End join
Hello everyone , I am Fengyun , Welcome to my blog perhaps WeChat official account 【 Laugh at Fengyun Road 】, In the days to come, let's learn about big data related technologies , Work hard together , Meet a better self !
Use scenarios and principles
Map End join It means that the data reaches map Merge before processing functions , Efficiency is much higher than Reduce End join, because Reduce End join Is to pass all the data Shuffle, Very resource intensive .
- Map End join Usage scenarios of :
- The data in a table is very small 、 A table has a large amount of data
- principle :
- Map End join It is an optimization for the above scenarios : Load all the data in the small table into memory , Press keyword Index . The data in the large table is used as map The input of , Yes map() Function each pair <key,value> Input , You can easily connect with small data that has been loaded into memory . Press the connection result key Output , after shuffle Stage , To reduce The end is already pressed key Grouped and connected data .
Realize the idea
- First, when submitting the job, put the small table file in the DistributedCache in , And then from DistributedCache Take out the small table for join Connected <key,value> Key value pair , Split it into memory ( Can be placed HashMap In a container )
- To rewrite MyMapper Class setup() Method , Because this method precedes map Method execution , Read the smaller table into a HashMap in .
- rewrite map function , Read the contents of the large table row by row , One by one and HashMap Compare the contents of , if Key identical , Then format the data , And then directly output .
- map Function output <key,value> Key value pairs first pass shuffle hold key All with the same value value Put it into an iterator to form values, And then <key,values> Key value pairs are passed to reduce function ,reduce Function input key Copy directly to the output key, Input values Through enhanced version for Loop through output one by one , The number of cycles determines <key,value> Number of outputs .
Code writing
Mapper Code
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
This part is divided into setup Methods and map Method . stay setup In this method, we first use getName() Get the current file name orders1 And assign it to fileName, And then use bufferedReader Read the cache file in memory . Use readLine() Method to read each line of record , Use this record split(“\t”) Method interception , And order_items The same fields in the file str[0] As key Values in map aggregate dict in , Select the field to be displayed as value.map Function receive order_items File data , And use split(“\t”) The intercepted data is stored in the array kv[] in ( among kv[1] And str[0] The fields represented are the same ), use if Judge , If in memory dict A collection of key Value inclusion kv[1], Then use context Of write() Method output key2/value2 value , among kv[1] As key2, other dict.get(kv[1])+“\t”+kv[2] As value2.
Reduce Code
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
map Function output <key,value > The key value pair first passes through a suffle hold key All with the same value value Put it into an iterator to form values, And then <key,values> Key value pairs are passed to reduce function ,reduce Function input key Copy directly to the output key, Input values Through enhanced version for Loop through output one by one .
Complete code
package mapreduce;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String fileName = context.getLocalCacheFiles()[0].getName();
//System.out.println(fileName);
BufferedReader reader = new BufferedReader(new FileReader(fileName));
String codeandname = null;
while (null != ( codeandname = reader.readLine() ) ) {
String str[]=codeandname.split("\t");
dict.put(str[0], str[2]+"\t"+str[3]);
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] kv = value.toString().split("\t");
if (dict.containsKey(kv[1])) {
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);
}
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Job job = Job.getInstance();
job.setJobName("mapjoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/mymapreduce5/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce5/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
URI uri = new URI("hdfs://localhost:9000/mymapreduce5/in/orders1");
job.addCacheFile(uri);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
WeChat official account : Below scan QR code
or Search for Laugh at Fengyun Road
Focus on
边栏推荐
- How to intercept the string correctly (for example, intercepting the stock in operation by applying the error information)
- Nacos installation and service registration
- go-redis之初始化連接
- Redis之Lua脚本
- Blue Bridge Cup_ Single chip microcomputer_ Measure the frequency of 555
- Lua script of redis
- Kratos战神微服务框架(一)
- Webrtc blog reference:
- MapReduce instance (V): secondary sorting
- MySQL数据库优化的几种方式(笔面试必问)
猜你喜欢
LeetCode41——First Missing Positive——hashing in place & swap
解决小文件处过多
Workflow - activiti7 environment setup
Redis geospatial
Advanced Computer Network Review(4)——Congestion Control of MPTCP
为拿 Offer,“闭关修炼,相信努力必成大器
Solve the problem of inconsistency between database field name and entity class attribute name (resultmap result set mapping)
Lua script of redis
Activiti7工作流的使用
Heap (priority queue) topic
随机推荐
Minio distributed file storage cluster for full stack development
Redis之连接redis服务命令
基于B/S的网上零食销售系统的设计与实现(附:源码 论文 Sql文件)
Redis之核心配置
[deep learning] semantic segmentation - source code summary
How to intercept the string correctly (for example, intercepting the stock in operation by applying the error information)
[three storage methods of graph] just use adjacency matrix to go out
Redis之持久化实操(Linux版)
Mapreduce实例(九):Reduce端join
Detailed explanation of cookies and sessions
One article read, DDD landing database design practice
【shell脚本】使用菜单命令构建在集群内创建文件夹的脚本
The five basic data structures of redis are in-depth and application scenarios
MySQL数据库优化的几种方式(笔面试必问)
Advanced Computer Network Review(4)——Congestion Control of MPTCP
Multivariate cluster analysis
O & M, let go of monitoring - let go of yourself
Oom happened. Do you know the reason and how to solve it?
Mapreduce实例(六):倒排索引
[Yu Yue education] Wuhan University of science and technology securities investment reference