当前位置:网站首页>MapReduce instance (IX): reduce end join
MapReduce instance (IX): reduce end join
2022-07-06 09:33:00 【Laugh at Fengyun Road】
MR Realization Reduce End join
Hello everyone , I am Fengyun , Welcome to my blog perhaps WeChat official account 【 Laugh at Fengyun Road 】, In the days to come, let's learn about big data related technologies , Work hard together , Meet a better self !
Realization principle
stay Reudce End of Join The connection is MapReduce Frame between tables Join The most common mode of operation .
(1)Map The main work of the end , From different tables ( file ) Of key/value Label each other to distinguish records from different sources . Then use the connection field as key, The rest and the new logo as value, Finally, output .
(2)Reduce The main work of the end , stay Reduce End as connection field key The grouping of has been completed , We just need to put the records from different files in each group ( stay map The stage has been marked ) Separate , Finally, Descartes just ok 了 .
Reduce End connection ratio Map End connections are more common , Because in map The stage cannot get all the required join Field , namely : The same key The corresponding fields may be located in different locations map in , however Reduce The end connection efficiency is relatively low , Because all data must go through Shuffle The process .
SQL And in and exists Differentiation article <= Take a look back.
in And exists The choice of <= Take a look back.
generally speaking , The order of magnitude of the external circulation is small , Faster , Because of the outer complexity N, But if the index is used in the inner layer, it can be reduced to logM
A join B Also Cartesian product , Finally, keep the same result in the specified field (A Inner loop ,B Outer loop )
A in B, To calculate B, Then Cartesian product ,(A Inner loop ,B Outer loop )
A exist B, To calculate A, Then Cartesian product (B Inner loop ,A Outer loop )
not in Indexes are not used in both internal and external tables , and not exists You can use indexes , So the latter is better than the former in any case
Code writing
The procedure mainly consists of two parts :Map Part and Reduce part .
Map Code
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
// Get line text content
String line = value.toString();
// Segment the line text content
String[] arr = line.split("\t");
/// Write the results
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}
Map It deals with a plain text file ,Mapper The data is processed by InputFormat Cut the data set into small data sets InputSplit, And use RecordReader It can be interpreted as <key,value> Yes, for map Function USES . stay map Function , First use getPath() Method to obtain fragments InputSplit And assign it to filePath,if Judge filePath If it contains goods.txt file name , Will map Function input value Value through Split(“\t”) Methods to segment , And goods_visit The same goods in the document id Field as key, Add "1+" As value. If if Judge filePath contain goods_visit.txt file name , The steps are the same as above , Just prefix other fields "2+" As value. Finally, put <key,value> adopt Context Of write Method output .
Reduce Code
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Vector<String> left = new Vector<String>(); // Used to store the data of the left table
Vector<String> right = new Vector<String>(); // Used to store the data of the right table
// Iterate over the set data
for (Text val : values) {
String str = val.toString();
// Add the data in the set to the corresponding left and right in
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
// obtain left and right The length of the set
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
// Traverse two vectors and write the result
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}
map Function output <key,value> after shuffle take key All the same value Put it into an iterator to form values, And then <key,values> Key value pairs are passed to reduce function .reduce Function , First, create two new ones Vector aggregate , Used to store input values China and Israel "1+“ Beginning and "2+“ Initial data . Then use the enhanced version for Loop through and nest if Judge , If judgment values The elements in 1+ start , Through substring(2) Method to segment elements , The results are stored in left Collection , if values Inside element with 2+ start , Still use substring(2) Method to segment elements , The results are stored in right Collection . Finally, there are two nested for loop , Traverse the output <key,value>, The input is key Directly assigned to the output key, Output value by left +”\t”+right.
Complete code
package mapreduce;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
//System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
//System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Vector<String> left = new Vector<String>();
Vector<String> right = new Vector<String>();
for (Text val : values) {
String str = val.toString();
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(mymapper.class);
job.setReducerClass(myreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");
Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");
FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
WeChat official account : Below scan QR code or Search for Laugh at Fengyun Road Focus on 
边栏推荐
- Appears when importing MySQL
- 小白带你重游Spark生态圈!
- [Yu Yue education] reference materials of power electronics technology of Jiangxi University of science and technology
- 基于B/S的网上零食销售系统的设计与实现(附:源码 论文 Sql文件)
- Publish and subscribe to redis
- Pytest's collection use case rules and running specified use cases
- Global and Chinese markets for small seed seeders 2022-2028: Research Report on technology, participants, trends, market size and share
- MapReduce instance (IV): natural sorting
- Pytest parameterization some tips you don't know / pytest you don't know
- Libuv thread
猜你喜欢

Sqlmap installation tutorial and problem explanation under Windows Environment -- "sqlmap installation | CSDN creation punch in"

Use of activiti7 workflow

Design and implementation of film and television creation forum based on b/s (attached: source code paper SQL file project deployment tutorial)

QML type: locale, date

Design and implementation of online shopping system based on Web (attached: source code paper SQL file)

Redis之性能指标、监控方式

解决小文件处过多

基于B/S的医院管理住院系统的研究与实现(附:源码 论文 sql文件)

数据建模有哪些模型

Opencv+dlib realizes "matching" glasses for Mona Lisa
随机推荐
[shell script] use menu commands to build scripts for creating folders in the cluster
Detailed explanation of cookies and sessions
Use of activiti7 workflow
Sentinel mode of redis
IDS cache preheating, avalanche, penetration
为什么要数据分层
Global and Chinese markets for small seed seeders 2022-2028: Research Report on technology, participants, trends, market size and share
[Yu Yue education] Wuhan University of science and technology securities investment reference
Different data-driven code executes the same test scenario
【每日一题】搬运工 (DFS / DP)
面渣逆袭:Redis连环五十二问,图文详解,这下面试稳了
CSP student queue
Go redis initialization connection
Pytest parameterization some tips you don't know / pytest you don't know
Redis core configuration
CAP理论
Kratos战神微服务框架(三)
英雄联盟轮播图自动轮播
YARN组织架构
One article read, DDD landing database design practice