当前位置:网站首页>6、MapReduce自定义分区实现
6、MapReduce自定义分区实现
2022-07-28 10:16:00 【数据分析师虾米】
MapReduce自带的分区器是HashPartitioner
原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。
自定义分分区需要继承Partitioner,复写getpariton()方法
自定义分区类:
注意:map的输出是<K,V>键值对
其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值
附:被计算的的文本
Dear Dear Bear Bear River Car Dear Dear Bear Rive
Dear Dear Bear Bear River Car Dear Dear Bear Rive
需要在main函数中设置,指定自定义分区类
自定义分区类:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
public static HashMap<String, Integer> dict = new HashMap<String, Integer>();
//Text代表着map阶段输出的key,IntWritable代表着输出的值
static{
dict.put("Dear", 0);
dict.put("Bear", 1);
dict.put("River", 2);
dict.put("Car", 3);
}
public int getPartition(Text text, IntWritable intWritable, int i) {
//
int partitionIndex = dict.get(text.toString());
return partitionIndex;
}
}
注意:map的输出结果是键值对<K,V>,int partitionIndex = dict.get(text.toString());中的partitionIndex是map输出键值对中的键的值,也就是K的值。
Maper类:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
for (String word : words) {
// 每个单词出现1次,作为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
Reducer类:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
for (String word : words) {
// 每个单词出现1次,作为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
main函数:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountMain {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
if (args.length != 2 || args == null) {
System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
// 打jar包
job.setJarByClass(WordCountMain.class);
// 通过job设置输入/输出格式
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordCountMap.class);
//map combine
//job.setCombinerClass(WordCountReduce.class);
job.setReducerClass(WordCountReduce.class);
//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
//job.setMapOutputKeyClass(.class)
// 设置最终输出key/value的类型m
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(4);
// 提交作业
job.waitForCompletion(true);
}
}
main函数参数设置:
边栏推荐
- QT generation Exe file and run without QT environment (enigma virtual box for green executable software packaging) graphic tutorial
- Bitwise and, or, XOR and other operation methods
- Inverse element & combinatorial number & fast power
- 4.调整数组顺序使奇数位于偶数前面
- 【栈的应用】--- 中缀表达式转后缀表达式
- 6. Double pointer -- the sum of the two numbers of the incremental array is equal to the target number
- SDUT 2446 final ranking
- 15. Judge whether the target value exists in the two-dimensional array
- 第11届蓝桥杯本科组校赛(20200321)
- Go 内存模型 (2014年5月31日版本)
猜你喜欢
随机推荐
20200229训练赛 L1 - 2 删除字符串中的子串 (20分)
Differences among pipes, pipe passes and pipe States
UEditor V1.4.3控制文件压缩
印度计划禁用中国电信设备!真离得开华为、中兴?
Get to know SuperMap idesktop for the first time
Why does the cluster need root permission
3.用数组逆序打印链表
SQL Server 2016 学习记录 --- 集合查询
10. The penultimate node in the linked list
8. Numbers that appear more than half of the time in the array
发力大核、独显!英众科技2020十代酷睿独显产品发布
中兴通讯:5nm 5G基站芯片正在技术导入!
机器学习--手写英文字母2--导入与处理数据
brief introduction
配置树莓派,过程和遇到问题
Implement a queue with two stacks [C language]
Ueeditor v1.4.3 control file compression
Context values traps and how to avoid or mitigate these traps in go
SQL Server 2016 learning record - Data Definition
Sleeping barber problem









