当前位置:网站首页>MapReduce instance (V): secondary sorting
MapReduce instance (V): secondary sorting
2022-07-06 09:33:00 【Laugh at Fengyun Road】
MR Realization Two order
Hello everyone , I am Fengyun , Welcome to my blog perhaps WeChat official account 【 Laugh at Fengyun Road 】, In the days to come, let's learn about big data related technologies , Work hard together , Meet a better self !
Realize the idea
stay Map Stage , Use job.setInputFormatClass Defined InputFormat Divide the input data set into small data blocks splites, meanwhile InputFormat Provide a RecordReader The implementation of the . In this experiment, we used TextInputFormat, He provided RecordReader The byte offset of the text is taken as key, The text in this line is used as value. This is custom Map The input is <LongWritable, Text> Why . Then call custom. Map Of map Method , One by one <LongWritable, Text> Key value pair input to Map Of map Method . Note that the output should be customized Map Output defined in <IntPair, IntWritable>. Finally generate a List<IntPair, IntWritable>. stay map At the end of the stage , Will call first job.setPartitionerClass For this List partition , Each partition is mapped to one reducer. Called again in each partition job.setSortComparatorClass Set up key Compare function class sorting . You can see , This in itself is a quadratic sort . If it doesn't pass job.setSortComparatorClass Set up key Comparison function class , You can use key Realized compareTo Method to sort . In this experiment , I used IntPair Realized compareTo Method .
stay Reduce Stage ,reducer Receive all mappings to this reducer Of map After output , Also called job.setSortComparatorClass Set up key Compare function classes to sort all data pairs . Then start building a key Corresponding value iterator . This is the time to use grouping , Use job.setGroupingComparatorClass Set the grouping function class . As long as this comparator compares the two key identical , They belong to the same group , Their value In a value iterator , And this iterator key Use all that belong to the same group key One of the first key. The last is to enter Reducer Of reduce Method ,reduce The input to the method is all (key And it's value iterator ). Also note that the input and output types must be the same as the custom Reducer Consistency stated in .
Code implementation
Two order : stay mapreduce in , be-all key It needs to be compared and sorted , And twice , First, according to partitioner, Then according to the size . In this case, it is also necessary to compare twice . First sort according to the first field , Then, when the first field is the same, it is sorted according to the second field . According to this , We can construct a composite class IntPair, He has two fields , First, use partition to sort the first field , Then use the comparison in the partition to sort the second field .Java The code is mainly divided into four parts : Customize key, Custom partition function class ,map part ,reduce part .
Customize key Code for :
public static class IntPair implements WritableComparable<IntPair>
{
int first; // The first member variable
int second; // The second member variable
public void set(int left, int right)
{
first = left;
second = right;
}
public int getFirst()
{
return first;
}
public int getSecond()
{
return second;
}
@Override
// Deserialization , Convert from binary in stream to IntPair
public void readFields(DataInput in) throws IOException
{
// TODO Auto-generated method stub
first = in.readInt();
second = in.readInt();
}
@Override
// serialize , take IntPair Convert to binary using streaming
public void write(DataOutput out) throws IOException
{
// TODO Auto-generated method stub
out.writeInt(first);
out.writeInt(second);
}
@Override
//key Comparison
public int compareTo(IntPair o)
{
// TODO Auto-generated method stub
if (first != o.first)
{
return first < o.first ? 1 : -1;
}
else if (second != o.second)
{
return second < o.second ? -1 : 1;
}
else
{
return 0;
}
}
@Override
public int hashCode()
{
return first * 157 + second;
}
@Override
public boolean equals(Object right)
{
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair)
{
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
}
else
{
return false;
}
}
}
All custom key The interface should be implemented WritableComparable, Because it is serializable and comparable , And overload methods . This class contains the following methods :
1. Deserialization , Convert from binary in stream to IntPair Method is public void readFields(DataInput in) throws IOException
2. serialize , take IntPair Convert to binary using streaming Method is public void write(DataOutput out)
3. key Comparison public int compareTo(IntPair o) in addition Two methods that the newly defined class should override public int hashCode() and public boolean equals(Object right) .
Partition function class code
public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
{
@Override
public int getPartition(IntPair key, IntWritable value,int numPartitions)
{
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
Yes key partition , According to custom key in first multiply 127 Take the absolute value and then correct numPartions Take the remainder to partition . This is mainly for Realize the first sorting .
Grouping function class code
public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(IntPair.class, true);
}
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2)
{
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
Grouping function class . stay reduce Stage , Construct a key Corresponding value When iterators , as long as first The same belongs to the same group , In a value iterator . This is a comparator , Need to inherit WritableComparator.
Map Code :
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
{
// Customize map
private final IntPair intkey = new IntPair();
private final IntWritable intvalue = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens())
{
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
intkey.set(right, left);
intvalue.set(left);
context.write(intkey, intvalue);
}
}
}
Reduce Code :
public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
{
private final Text left = new Text();
private static final Text SEPARATOR = new Text("------------------------------------------------");
public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
context.write(SEPARATOR, null);
left.set(Integer.toString(key.getFirst()));
System.out.println(left);
for (IntWritable val : values)
{
context.write(left, val);
//System.out.println(val);
}
}
}
Complete code :
package mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySort
{
public static class IntPair implements WritableComparable<IntPair>
{
int first;
int second;
public void set(int left, int right)
{
first = left;
second = right;
}
public int getFirst()
{
return first;
}
public int getSecond()
{
return second;
}
@Override
public void readFields(DataInput in) throws IOException
{
// TODO Auto-generated method stub
first = in.readInt();
second = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException
{
// TODO Auto-generated method stub
out.writeInt(first);
out.writeInt(second);
}
@Override
public int compareTo(IntPair o)
{
// TODO Auto-generated method stub
if (first != o.first)
{
return first < o.first ? 1 : -1;
}
else if (second != o.second)
{
return second < o.second ? -1 : 1;
}
else
{
return 0;
}
}
@Override
public int hashCode()
{
return first * 157 + second;
}
@Override
public boolean equals(Object right)
{
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair)
{
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
}
else
{
return false;
}
}
}
public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
{
@Override
public int getPartition(IntPair key, IntWritable value,int numPartitions)
{
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(IntPair.class, true);
}
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2)
{
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
{
private final IntPair intkey = new IntPair();
private final IntWritable intvalue = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens())
{
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
intkey.set(right, left);
intvalue.set(left);
context.write(intkey, intvalue);
}
}
}
public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
{
private final Text left = new Text();
private static final Text SEPARATOR = new Text("------------------------------------------------");
public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
context.write(SEPARATOR, null);
left.set(Integer.toString(key.getFirst()));
System.out.println(left);
for (IntWritable val : values)
{
context.write(left, val);
//System.out.println(val);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
Configuration conf = new Configuration();
Job job = new Job(conf, "secondarysort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
String[] otherArgs=new String[2];
otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2";
otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out";
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-------------- end ----------------
WeChat official account : Below scan QR code
or Search for Laugh at Fengyun Road
Focus on
边栏推荐
- Redis之持久化实操(Linux版)
- In depth analysis and encapsulation call of requests
- Redis geospatial
- Advanced Computer Network Review(3)——BBR
- Kratos ares microservice framework (III)
- Mapreduce实例(九):Reduce端join
- One article read, DDD landing database design practice
- [Yu Yue education] Wuhan University of science and technology securities investment reference
- QDialog
- Mysql database recovery (using mysqlbinlog command)
猜你喜欢
Intel distiller Toolkit - Quantitative implementation 2
Nacos installation and service registration
Activiti7工作流的使用
Improved deep embedded clustering with local structure preservation (Idec)
[Yu Yue education] reference materials of complex variable function and integral transformation of Shenyang University of Technology
Redis core configuration
Sentinel mode of redis
Minio distributed file storage cluster for full stack development
Compilation of libwebsocket
Redis之核心配置
随机推荐
为拿 Offer,“闭关修炼,相信努力必成大器
Advanced Computer Network Review(5)——COPE
Redis之核心配置
Kratos ares microservice framework (II)
Redis之Geospatial
五层网络体系结构
Detailed explanation of cookies and sessions
Processes of libuv
Activiti7工作流的使用
Reids之缓存预热、雪崩、穿透
Kratos战神微服务框架(三)
工作流—activiti7环境搭建
Redis之连接redis服务命令
基于B/S的医院管理住院系统的研究与实现(附:源码 论文 sql文件)
Global and Chinese market for annunciator panels 2022-2028: Research Report on technology, participants, trends, market size and share
YARN组织架构
Design and implementation of online shopping system based on Web (attached: source code paper SQL file)
068.查找插入位置--二分查找
Global and Chinese markets for small seed seeders 2022-2028: Research Report on technology, participants, trends, market size and share
Redis之主从复制