当前位置:网站首页>MapReduce instance (III): data De duplication
MapReduce instance (III): data De duplication
2022-07-27 16:15:00 【Laugh at Fengyun Road】
MR Realization Data De duplication
Hello everyone , I am Fengyun , Welcome to my blog perhaps WeChat official account 【 Laugh at Fengyun Road 】, In the days to come, let's learn about big data related technologies , Work hard together , Meet a better self !
Realize the idea
The ultimate goal of data De duplication is to make the original data appear more than once in the output file only once . stay MapReduce In the process ,map Output <key,value> after shuffle Process aggregation <key,value-list> Later to reduce. We naturally think of giving all records of the same data to one station reduce machine , No matter how many times this data appears , Just output it once in the final result . The concrete is reduce The input should be data key, And yes value-list There is no demand for ( It can be set to null ). When reduce Received a <key,value-list> Directly input the key Copy to output key in , And will value Set to null , Then the output <key,value>.
MaprReduce The process of weight removal is shown in the figure below :
Write code
Mapper Code
public static class Map extends Mapper<Object , Text , Text , NullWritable>
//map Will input value Copy to output data key On , And output directly
{
private static Text newKey=new Text(); // The type of data for each row obtained from the input
public void map(Object key,Text value,Context context) throws IOException, InterruptedException
// Realization map function
{
// Get and output each processing process
String line=value.toString();
System.out.println(line);
String arr[]=line.split("\t");
newKey.set(arr[1]);
context.write(newKey, NullWritable.get());
System.out.println(newKey);
}
}
Mapper Phase adoption Hadoop The default job input method , Put the input value use split() Method interception , Intercepted goods id Field set to key, Set up value It's empty , And then directly output <key,value>.
Reducer Code
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException
// Realization reduce function
{
context.write(key,NullWritable.get()); // Get and output each processing process
}
}
map Output <key,value> Key value pair process shuffle The process , Coalescence <key,value-list> after , I'll give it to you reduce function .reduce function , Regardless of each key How many value, It directly assigns the input value to the output key, Will output value Set to null , Then the output <key,value> That's all right. .
Complete code
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Filter{
public static class Map extends Mapper<Object , Text , Text , NullWritable>{
private static Text newKey=new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String line=value.toString();
System.out.println(line);
String arr[]=line.split("\t");
newKey.set(arr[1]);
context.write(newKey, NullWritable.get());
System.out.println(newKey);
}
}
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{
context.write(key,NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf=new Configuration();
System.out.println("start");
Job job =new Job(conf,"filter");
job.setJarByClass(Filter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");
Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");
FileInputFormat.addInputPath(job,in);
FileOutputFormat.setOutputPath(job,out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
WeChat official account : Below scan QR code or Search for Laugh at Fengyun Road Focus on 
边栏推荐
- 快速高效删除node_modules
- Servlet基础知识点
- Penetration test - dry goods | 80 + network security interview experience post (interview)
- 可载100人!马斯克发布史上最强“星际飞船” !最早明年上火星!
- 借5G东风,联发科欲再战高端市场?
- leetcode234题-简单方法判断回文链表
- Pycharm导入已有的本地安装包
- 减小程序rom ram,gcc -ffunction-sections -fdata-sections -Wl,–gc-sections 参数详解
- scrapy爬虫框架
- Keil implements compilation with makefile
猜你喜欢

Mlx90640 infrared thermal imager temperature sensor module development notes (VII)

Clickhouse 20.x distributed table testing and chproxy deployment (II)

Content ambiguity occurs when using transform:translate()

centos yum方式安装mysql

Web test learning notes 01

Leetcode 226 翻转二叉树(递归)

Axure 安装图标字体元件库

SQL multi table query

MySQL索引

The risk of multithreading -- thread safety
随机推荐
QT (VI) value and string conversion
IO stream introduction
To meet risc-v challenges? ARM CPU introduces custom instruction function!
JSP Foundation
Nacos
判断数据的精确类型
Personal perception of project optimization
Coding skills - Global exception capture & unified return body & Business exception
For enterprise operation and maintenance security, use the cloud housekeeper fortress machine!
vant-ui toast和dialog使用
使用transform:translate()出现内容模糊问题
Pychart import existing project
Excel提取重复项
Single machine high concurrency model design
Flask连接mysql数据库已有表
Wechat applet personal number opens traffic master
Brief description of tenant and multi tenant concepts in cloud management platform
MySQL索引
这些题~~
Openwrt增加对 sd card 支持
