当前位置:网站首页>Mapreduce实例(九):Reduce端join
Mapreduce实例(九):Reduce端join
2022-07-06 09:01:00 【笑看风云路】
大家好,我是风云,欢迎大家关注我的博客 或者 微信公众号【笑看风云路】,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!
实现原理
在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。
(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。
Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。
SQL之in和exists区别篇 <= 回顾一下
in与exists的取舍 <= 回顾一下
一般而言,外循环的数量级小的,速度更快,因为外层复杂度N,但是内层走索引的话就能缩小到logM
A join B也是笛卡尔积,最后保留指定字段相同的结果而已(A内循环,B外循环)
A in B,先计算B,然后笛卡尔积,(A内循环,B外循环)
A exist B,先计算A,然后笛卡尔积(B内循环,A外循环)
not in内外表都不会用到索引,而not exists能用到索引,所以后者任何情况都比前者好
代码编写
程序主要包括两部分:Map部分和Reduce部分。
Map代码
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
//获取行文本内容
String line = value.toString();
//对行文本内容进行切分
String[] arr = line.split("\t");
///把结果写出去
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}
Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key,value>对提供给map函数使用。在map函数中,首先用getPath()方法获取分片InputSplit的路径并赋值给filePath,if判断filePath中如果包含goods.txt文件名,则将map函数输入的value值通过Split(“\t”)方法进行切分,与goods_visit文件里相同的商品id字段作为key,其他字段前加"1+"作为value。如果if判断filePath包含goods_visit.txt文件名,步骤与上面相同,只是把其他字段前加"2+"作为value。最后把<key,value>通过Context的write方法输出。
Reduce代码
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Vector<String> left = new Vector<String>(); //用来存放左表的数据
Vector<String> right = new Vector<String>(); //用来存放右表的数据
//迭代集合数据
for (Text val : values) {
String str = val.toString();
//将集合中的数据添加到对应的left和right中
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
//获取left和right集合的长度
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
//遍历两个向量将结果写进去
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}
map函数输出的<key,value>经过shuffle将key相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数。reduce函数中,首先新建两个Vector集合,用于存放输入的values中以"1+“开头和"2+“开头的数据。然后用增强版for循环遍历并嵌套if判断,若判断values里的元素以1+开头,则通过substring(2)方法切分元素,结果存放到left集合中,若values里元素以2+开头,则仍利用substring(2)方法切分元素,结果存放到right集合中。最后再用两个嵌套for循环,遍历输出<key,value>,其中输入的key直接赋值给输出的key,输出的value为left +”\t”+right。
完整代码
package mapreduce;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
//System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
//System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Vector<String> left = new Vector<String>();
Vector<String> right = new Vector<String>();
for (Text val : values) {
String str = val.toString();
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(mymapper.class);
job.setReducerClass(myreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");
Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");
FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
微信公众号:扫描下方二维码
或 搜索 笑看风云路
关注
边栏推荐
- Opencv+dlib realizes "matching" glasses for Mona Lisa
- Master slave replication of redis
- Redis之Lua脚本
- Detailed explanation of cookies and sessions
- Advance Computer Network Review(1)——FatTree
- [oc]- < getting started with UI> -- common controls uibutton
- [OC foundation framework] - [set array]
- Reids之删除策略
- Pytest's collection use case rules and running specified use cases
- Redis之哨兵模式
猜你喜欢
为拿 Offer,“闭关修炼,相信努力必成大器
Master slave replication of redis
LeetCode41——First Missing Positive——hashing in place & swap
甘肃旅游产品预订增四倍:“绿马”走红,甘肃博物馆周边民宿一房难求
Reids之删除策略
数据建模有哪些模型
Redis之五大基础数据结构深入、应用场景
基于B/S的影视创作论坛的设计与实现(附:源码 论文 sql文件 项目部署教程)
Redis分布式锁实现Redisson 15问
How to intercept the string correctly (for example, intercepting the stock in operation by applying the error information)
随机推荐
Persistence practice of redis (Linux version)
Redis之Bitmap
Redis之持久化实操(Linux版)
go-redis之初始化连接
068.查找插入位置--二分查找
Redis之哨兵模式
Kratos ares microservice framework (I)
使用标签模板解决用户恶意输入的问题
Reids之缓存预热、雪崩、穿透
Redis之性能指标、监控方式
The five basic data structures of redis are in-depth and application scenarios
Simclr: comparative learning in NLP
IDS' deletion policy
Mathematical modeling 2004b question (transmission problem)
What is an R-value reference and what is the difference between it and an l-value?
Sentinel mode of redis
The carousel component of ant design calls prev and next methods in TS (typescript) environment
Kratos战神微服务框架(二)
Intel distiller Toolkit - Quantitative implementation 2
Advance Computer Network Review(1)——FatTree