当前位置:网站首页>6、MapReduce自定义分区实现
6、MapReduce自定义分区实现
2022-07-28 10:16:00 【数据分析师虾米】
MapReduce自带的分区器是HashPartitioner
原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。
自定义分分区需要继承Partitioner,复写getpariton()方法
自定义分区类:
注意:map的输出是<K,V>键值对
其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值
附:被计算的的文本
Dear Dear Bear Bear River Car Dear Dear Bear Rive
Dear Dear Bear Bear River Car Dear Dear Bear Rive
需要在main函数中设置,指定自定义分区类
自定义分区类:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
public static HashMap<String, Integer> dict = new HashMap<String, Integer>();
//Text代表着map阶段输出的key,IntWritable代表着输出的值
static{
dict.put("Dear", 0);
dict.put("Bear", 1);
dict.put("River", 2);
dict.put("Car", 3);
}
public int getPartition(Text text, IntWritable intWritable, int i) {
//
int partitionIndex = dict.get(text.toString());
return partitionIndex;
}
}
注意:map的输出结果是键值对<K,V>,int partitionIndex = dict.get(text.toString());中的partitionIndex是map输出键值对中的键的值,也就是K的值。
Maper类:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
for (String word : words) {
// 每个单词出现1次,作为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
Reducer类:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
for (String word : words) {
// 每个单词出现1次,作为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
main函数:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountMain {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
if (args.length != 2 || args == null) {
System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
// 打jar包
job.setJarByClass(WordCountMain.class);
// 通过job设置输入/输出格式
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordCountMap.class);
//map combine
//job.setCombinerClass(WordCountReduce.class);
job.setReducerClass(WordCountReduce.class);
//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
//job.setMapOutputKeyClass(.class)
// 设置最终输出key/value的类型m
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(4);
// 提交作业
job.waitForCompletion(true);
}
}
main函数参数设置:
边栏推荐
- UEditor V1.4.3控制文件压缩
- 【栈的应用】--- 中缀表达式转后缀表达式
- Vulnerability analysis hevd-0x8.integeroverflow[win7x86]
- Step 4 - user development environment settings
- gcc: error trying to exec 'as': execvp: No such file or directory
- gcc: error trying to exec 'as': execvp: No such file or directory
- Hurun released the 2020 top 10 Chinese chip design private enterprises: Huawei Hisilicon did not appear on the list!
- Sword finger offer
- 2019年9月PAT甲级题目
- Database security - create login user + configure permissions [notes]
猜你喜欢

逆元&组合数&快速幂

Aqua Data Studio 18.5.0导出insert语句

C language secondary pointer explanation and example code

IDEA打包jar包及运行jar包命令

gcc: error trying to exec 'as': execvp: No such file or directory

6. Double pointer -- the sum of the two numbers of the incremental array is equal to the target number

机器人技术(RoboCup 2D)如何进行一场球赛

gcc: error trying to exec 'as': execvp: No such file or directory

ACM winter vacation training 5

ACM寒假集训#6
随机推荐
JVM principle
用两个栈实现一个队列【C语言】
a different object with the same identifier value was already associated with the session
最短路专题
MySQL的SQL TRACE一例
机器人技术(RoboCup 2D)如何进行一场球赛
Uni app project directory, file function introduction and development specification
问题总结档案
Typora使用教程
9. Delete nodes in the linked list
安装office自定义项 安装期间出错 解决办法
PHP generates QR code (learning)
Summary of key points of bank entry examination
django-celery-redis异步发邮件
Implement a queue with two stacks [C language]
4.调整数组顺序使奇数位于偶数前面
Hurun released the 2020 top 10 Chinese chip design private enterprises: Huawei Hisilicon did not appear on the list!
Shortest path topic
SQL Server 2016 learning records - connection query
Go json.Decoder Considered Harmful