当前位置:网站首页>Mapreduce实例(九):Reduce端join
Mapreduce实例(九):Reduce端join
2022-07-06 09:01:00 【笑看风云路】
大家好,我是风云,欢迎大家关注我的博客 或者 微信公众号【笑看风云路】,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!
实现原理
在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。
(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。
Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。
SQL之in和exists区别篇 <= 回顾一下
in与exists的取舍 <= 回顾一下
一般而言,外循环的数量级小的,速度更快,因为外层复杂度N,但是内层走索引的话就能缩小到logM
A join B也是笛卡尔积,最后保留指定字段相同的结果而已(A内循环,B外循环)
A in B,先计算B,然后笛卡尔积,(A内循环,B外循环)
A exist B,先计算A,然后笛卡尔积(B内循环,A外循环)
not in内外表都不会用到索引,而not exists能用到索引,所以后者任何情况都比前者好
代码编写
程序主要包括两部分:Map部分和Reduce部分。
Map代码
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
//获取行文本内容
String line = value.toString();
//对行文本内容进行切分
String[] arr = line.split("\t");
///把结果写出去
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}
Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key,value>对提供给map函数使用。在map函数中,首先用getPath()方法获取分片InputSplit的路径并赋值给filePath,if判断filePath中如果包含goods.txt文件名,则将map函数输入的value值通过Split(“\t”)方法进行切分,与goods_visit文件里相同的商品id字段作为key,其他字段前加"1+"作为value。如果if判断filePath包含goods_visit.txt文件名,步骤与上面相同,只是把其他字段前加"2+"作为value。最后把<key,value>通过Context的write方法输出。
Reduce代码
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Vector<String> left = new Vector<String>(); //用来存放左表的数据
Vector<String> right = new Vector<String>(); //用来存放右表的数据
//迭代集合数据
for (Text val : values) {
String str = val.toString();
//将集合中的数据添加到对应的left和right中
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
//获取left和right集合的长度
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
//遍历两个向量将结果写进去
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}
map函数输出的<key,value>经过shuffle将key相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数。reduce函数中,首先新建两个Vector集合,用于存放输入的values中以"1+“开头和"2+“开头的数据。然后用增强版for循环遍历并嵌套if判断,若判断values里的元素以1+开头,则通过substring(2)方法切分元素,结果存放到left集合中,若values里元素以2+开头,则仍利用substring(2)方法切分元素,结果存放到right集合中。最后再用两个嵌套for循环,遍历输出<key,value>,其中输入的key直接赋值给输出的key,输出的value为left +”\t”+right。
完整代码
package mapreduce;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
//System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
//System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Vector<String> left = new Vector<String>();
Vector<String> right = new Vector<String>();
for (Text val : values) {
String str = val.toString();
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(mymapper.class);
job.setReducerClass(myreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");
Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");
FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
微信公众号:扫描下方二维码
或 搜索 笑看风云路
关注
边栏推荐
- Redis之Lua脚本
- Global and Chinese market of appointment reminder software 2022-2028: Research Report on technology, participants, trends, market size and share
- [shell script] use menu commands to build scripts for creating folders in the cluster
- go-redis之初始化連接
- Meituan Er Mian: why does redis have sentinels?
- AcWing 2456. 记事本
- Redis之Bitmap
- Redis之哨兵模式
- In depth analysis and encapsulation call of requests
- 不同的数据驱动代码执行相同的测试场景
猜你喜欢
Sentinel mode of redis
LeetCode41——First Missing Positive——hashing in place & swap
基于WEB的网上购物系统的设计与实现(附:源码 论文 sql文件)
Opencv+dlib realizes "matching" glasses for Mona Lisa
CUDA realizes focal_ loss
Advanced Computer Network Review(5)——COPE
Once you change the test steps, write all the code. Why not try yaml to realize data-driven?
[text generation] recommended in the collection of papers - Stanford researchers introduce time control methods to make long text generation more smooth
Lua script of redis
Redis之cluster集群
随机推荐
Kratos战神微服务框架(三)
【shell脚本】使用菜单命令构建在集群内创建文件夹的脚本
[oc foundation framework] - < copy object copy >
IDS' deletion policy
【每日一题】搬运工 (DFS / DP)
Advance Computer Network Review(1)——FatTree
Digital people anchor 618 sign language with goods, convenient for 27.8 million people with hearing impairment
Leetcode problem solving 2.1.1
Redis之连接redis服务命令
AcWing 2456. Notepad
Withdrawal of wechat applet (enterprise payment to change)
Implement window blocking on QWidget
An article takes you to understand the working principle of selenium in detail
QML type: locale, date
Lua script of redis
Redis cluster
Master slave replication of redis
面渣逆袭:Redis连环五十二问,图文详解,这下面试稳了
[text generation] recommended in the collection of papers - Stanford researchers introduce time control methods to make long text generation more smooth
go-redis之初始化连接