当前位置:网站首页>Mapreduce实例(九):Reduce端join

Mapreduce实例(九):Reduce端join

2022-07-06 09:01:00 笑看风云路

大家好,我是风云,欢迎大家关注我的博客 或者 微信公众号【笑看风云路】,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!

实现原理

在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。

(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。

Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。

SQL之in和exists区别篇 <= 回顾一下

in与exists的取舍 <= 回顾一下

一般而言,外循环的数量级小的,速度更快,因为外层复杂度N,但是内层走索引的话就能缩小到logM

A join B也是笛卡尔积,最后保留指定字段相同的结果而已(A内循环,B外循环)

A in B,先计算B,然后笛卡尔积,(A内循环,B外循环)

A exist B,先计算A,然后笛卡尔积(B内循环,A外循环)

not in内外表都不会用到索引,而not exists能用到索引,所以后者任何情况都比前者好

代码编写

程序主要包括两部分:Map部分和Reduce部分。

Map代码

 public static class mymapper extends Mapper<Object, Text, Text, Text>{
      
    @Override  
    protected void map(Object key, Text value, Context context)  
      throws IOException, InterruptedException {
      
      String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();  
      if (filePath.contains("orders1")) {
      
        //获取行文本内容 
        String line = value.toString();  
        //对行文本内容进行切分 
        String[] arr = line.split("\t");  
        ///把结果写出去 
        context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));  
        System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);  
      }else if(filePath.contains("order_items1")) {
      
        String line = value.toString();  
        String[] arr = line.split("\t");  
        context.write(new Text(arr[1]), new Text("2+" + arr[2]));  
        System.out.println(arr[1] + "_2+" + arr[2]);  
      }  
    }  
  }  

Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key,value>对提供给map函数使用。在map函数中,首先用getPath()方法获取分片InputSplit的路径并赋值给filePath,if判断filePath中如果包含goods.txt文件名,则将map函数输入的value值通过Split(“\t”)方法进行切分,与goods_visit文件里相同的商品id字段作为key,其他字段前加"1+"作为value。如果if判断filePath包含goods_visit.txt文件名,步骤与上面相同,只是把其他字段前加"2+"作为value。最后把<key,value>通过Context的write方法输出。

Reduce代码

public static class myreducer extends Reducer<Text, Text, Text, Text>{
      
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
      throws IOException, InterruptedException {
      
      Vector<String> left  = new Vector<String>();  //用来存放左表的数据 
      Vector<String> right = new Vector<String>();  //用来存放右表的数据 
      //迭代集合数据 
      for (Text val : values) {
      
        String str = val.toString();  
        //将集合中的数据添加到对应的left和right中 
        if (str.startsWith("1+")) {
      
          left.add(str.substring(2));  
        }  
        else if (str.startsWith("2+")) {
      
          right.add(str.substring(2));  
        }  
      }  
      //获取left和right集合的长度 
      int sizeL = left.size();  
      int sizeR = right.size();  
      //System.out.println(key + "left:"+left); 
      //System.out.println(key + "right:"+right); 
      //遍历两个向量将结果写进去 
      for (int i = 0; i < sizeL; i++) {
      
        for (int j = 0; j < sizeR; j++) {
      
          context.write( key, new Text(  left.get(i) + "\t" + right.get(j) ) );  
          //System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j)); 
        }  
      }  
    }  
  } 

map函数输出的<key,value>经过shuffle将key相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数。reduce函数中,首先新建两个Vector集合,用于存放输入的values中以"1+“开头和"2+“开头的数据。然后用增强版for循环遍历并嵌套if判断,若判断values里的元素以1+开头,则通过substring(2)方法切分元素,结果存放到left集合中,若values里元素以2+开头,则仍利用substring(2)方法切分元素,结果存放到right集合中。最后再用两个嵌套for循环,遍历输出<key,value>,其中输入的key直接赋值给输出的key,输出的value为left +”\t”+right。

完整代码

 package mapreduce;  
  import java.io.IOException;  
  import java.util.Iterator;  
  import java.util.Vector;  
  import org.apache.hadoop.fs.Path;  
  import org.apache.hadoop.io.Text;  
  import org.apache.hadoop.mapreduce.Job;  
  import org.apache.hadoop.mapreduce.Mapper;  
  import org.apache.hadoop.mapreduce.Reducer;  
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  public class ReduceJoin {
      
    public static class mymapper extends Mapper<Object, Text, Text, Text>{
      
      @Override  
      protected void map(Object key, Text value, Context context)  
        throws IOException, InterruptedException {
      
        String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();  
        if (filePath.contains("orders1")) {
      
          String line = value.toString();  
          String[] arr = line.split("\t");  
          context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));  
          //System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]); 
        }else if(filePath.contains("order_items1")) {
      
          String line = value.toString();  
          String[] arr = line.split("\t");  
          context.write(new Text(arr[1]), new Text("2+" + arr[2]));  
          //System.out.println(arr[1] + "_2+" + arr[2]); 
        }  
      }  
    }  
  
    public static class myreducer extends Reducer<Text, Text, Text, Text>{
      
      @Override  
      protected void reduce(Text key, Iterable<Text> values, Context context)  
        throws IOException, InterruptedException {
      
        Vector<String> left  = new Vector<String>();  
        Vector<String> right = new Vector<String>();  
        for (Text val : values) {
      
          String str = val.toString();  
          if (str.startsWith("1+")) {
      
            left.add(str.substring(2));  
          }  
          else if (str.startsWith("2+")) {
      
            right.add(str.substring(2));  
          }  
        }  
  
        int sizeL = left.size();  
        int sizeR = right.size();  
        //System.out.println(key + "left:"+left); 
        //System.out.println(key + "right:"+right); 
        for (int i = 0; i < sizeL; i++) {
      
          for (int j = 0; j < sizeR; j++) {
      
            context.write( key, new Text(  left.get(i) + "\t" + right.get(j) ) );  
            //System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j)); 
          }  
        }  
      }  
    }  
  
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
      
      Job job = Job.getInstance();  
      job.setJobName("reducejoin");  
      job.setJarByClass(ReduceJoin.class);  
  
      job.setMapperClass(mymapper.class);  
      job.setReducerClass(myreducer.class);  
  
      job.setOutputKeyClass(Text.class);  
      job.setOutputValueClass(Text.class);  
  
      Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");  
      Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");  
      Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");  
  
      FileInputFormat.addInputPath(job, left);  
      FileInputFormat.addInputPath(job, right);  
      FileOutputFormat.setOutputPath(job, out);  
  
      System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
  } 

-------------- end ----------------

微信公众号:扫描下方二维码或 搜索 笑看风云路 关注
笑看风云路

原网站

版权声明
本文为[笑看风云路]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u011109589/article/details/125094057