当前位置:网站首页>MapReduce advanced
MapReduce advanced
2022-07-23 10:43:00 【S_ ng】
1.HDFS Data format details
1.1 File format
Face the line :.txt Separable .seq Separable
For the column :.rc Separable .orc Separable
1.2 Compressed format
Separable :.lzo Native .bz2 Native
Indivisible :.gz Native .snappy It's not original
1.3 Set the output format to gzip
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
2. Customize Partition
2.1 Customize reduce Number
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.examples.WordCountV2 \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/tianliangedu/input /tmp/output
2.2 Customize Partition Realization
*/
public static class MyHashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
// return key.toString().charAt(0);
}
}Appoint partition class
job.setPartitionerClass(MyHashPartitioner.class);
Script call :
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.examples.SelfDefinePartitioner \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/input /tmp/output
3. MR The application reads the external configuration file -Configuration Pass on
Step breakdown
- Implementation is based on input_filter A sort of file data in the directory , namely Map and Reduce Read in and reduction processing of .
- Put the local file whitelist.txt Pass to Driver class , Read the contents of the file txtContent
- take txtContent adopt Configuration Of set Methods are passed to map and reduce Mission
- stay map Pass... In the task Configuration Object's get Method to get the passed value txtContent
- take txtContent It can be interpreted as Set object , Yes map In the task map Method to filter the output
- because map The end has been filtered ,reduce The end will not need any change
package com.tianliangedu.core.readconfig; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.Logger; // start-up mr Of driver class public class ConfigSetTransferDriver { public static Logger logger = Logger .getLogger(ConfigSetTransferDriver.class); // map class , Realization map function public static class LineProcessMapper extends Mapper<Object, Text, Text, IntWritable> { // Temporarily store the value of each passed word , Save space for repeated applications private Text outputKey = new Text(); private IntWritable outputValue = new IntWritable(); // Filter whitename Of set aggregate private Set<String> whiteNameSet = new HashSet<String>(); // Every map The task has and will only be executed once setup Method , For initialization map Required parameters before function execution @Override protected void setup( Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String whitelistString = conf.get("whitelist"); String[] whiteNameArray = whitelistString.split("\\s"); whiteNameSet.addAll(Arrays.asList(whiteNameArray)); } // The core map The concrete implementation of the method , one by one <key,value> Yes, deal with public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // adopt context object , take map Output one by one String tempLine = value.toString(); if (tempLine != null && tempLine.trim().length() > 0) { String[] columnArray = tempLine.split("\\s"); if (whiteNameSet.contains(columnArray[0])) { outputKey.set(columnArray[0]); outputValue.set(Integer.parseInt(columnArray[1])); context.write(outputKey, outputValue); } } } } // reduce class , Realization reduce function public static class SortReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // The core reduce The concrete implementation of the method , one by one <key,List(v1,v2)> To deal with public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Reinforced for, Get the value of each element in the iterator in turn for (IntWritable val : values) { // Output the calculation results one by one context.write(key, val); } } } // Read the contents of a file with a specified local path and file encoding , Convert to string public static String readFile(String filePath, String fileEncoding) { if (fileEncoding == null) { fileEncoding = System.getProperty("file.encoding"); } File file = new File(filePath); BufferedReader br = null; String line = null; StringBuilder stringBuilder = new StringBuilder(); int lineCounter=0; try { br = new BufferedReader(new InputStreamReader(new FileInputStream( file), fileEncoding)); while ((line = br.readLine()) != null) { if(lineCounter>0){ stringBuilder.append("\n"); } stringBuilder.append(line); lineCounter++; } return stringBuilder.toString(); } catch (Exception e) { logger.info(e.getLocalizedMessage()); } finally { if (br != null) { try { br.close(); } catch (IOException e) { logger.info(e.getLocalizedMessage()); logger.info(" close IOUtil An error occurred while streaming !"); } } } return null; } // Configuration file reading and value passing public static void readConfigAndTransfer(Configuration conf,String filePath) { // Read local configuration file String source = readFile(filePath, "utf-8"); // Pass the values in the configuration file through conf set Way of transmission To the compute node conf.set("whitelist", source); // By means of log printing , The read value , Print out , If you don't print the log , The following code snippet can be removed logger.info("whitelist=" + source); } // start-up mr Of driver Method public static void main(String[] args) throws Exception { // Get the cluster configuration parameters Configuration conf = new Configuration(); // Parameter resolver GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if ((remainingArgs.length < 3)) { System.err .println("Usage: yarn jar jar_path main_class_path -D parameter list <in> <out>"); System.exit(2); } // Read and pass configuration parameters readConfigAndTransfer(conf,remainingArgs[2]); // Set to this job In the example Job job = Job.getInstance(conf, " Dawn conf Pass it on directly "); // Specifies that the main class for this execution is WordCount job.setJarByClass(ConfigSetTransferDriver.class); // Appoint map class job.setMapperClass(LineProcessMapper.class); // Appoint reducer class job.setReducerClass(SortReducer.class); // Appoint job Output key and value The type of , If map and reduce The output type is not exactly the same , It needs to be reset map Of output Of key and value Of class type job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Specify the path to the input data FileInputFormat.addInputPath(job, new Path(remainingArgs[0])); // Specify the output path , And the output path must not exist FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); // Appoint job Execution mode , Wait until the task is completed , The client submitting the task will exit ! System.exit(job.waitForCompletion(true) ? 0 : 1); } }
边栏推荐
- Summary of topics related to strings
- Hololens third perspective development [nanny level tutorial] [stepping on the pit record]
- Compile build tool -bazel
- C# 客户端程序调用外部程序的3种实现方法
- DPDK 交叉编译基本流程
- 美团8年经验之谈,测试工程师如何进阶(自动化、性能、测开)
- MySql 数据库表名命名规则---方便自动转换工具转换
- Kingbasees SQL language reference manual of Jincang database (8. Function (7))
- 0 basic career change software test, the necessary skills with a monthly salary of 6000 and 11000 are quite different
- Chapter 3 Standard Input
猜你喜欢

Self operation and maintenance: a new sample of it operation and maintenance services in Colleges and Universities

理解ASP.NET Core - 基于Cookie的身份认证(Authentication)

Kingbasees SQL language reference manual of Jincang database (8. Function (7))

SQLZOO——SELECT Quiz

7. < tag dynamic programming and stock trading Collection> lt.121. The best time to buy and sell stocks + lt.122. The best time to buy and sell stocks II + lt.123. The best time to buy and sell stocks

redis 复制集群搭建

JMeter record the BeanShell written into excel instance caused by an automatic data generation

After 100 billion of revenue, Alibaba cloud ecosystem has a new way to play

低代码平台搭建医药企业供应商、医院、患者等多方协同管理案例分析

Redis pseudo cluster one click deployment script - pro test available
随机推荐
Kingbasees SQL language reference manual of Jincang database (4. Pseudo column)
数据湖:Delta Lake介绍
元宇宙浪潮震撼来袭,抓住时机,齐心协力
SQLZOO——SELECT from WORLD Tutorial
Idea integrated sonar complete process
Kingbasees SQL language reference manual of Jincang database (8. Function (6))
千亿营收之后,阿里云生态有了新打法
7. < tag dynamic programming and stock trading Collection> lt.121. The best time to buy and sell stocks + lt.122. The best time to buy and sell stocks II + lt.123. The best time to buy and sell stocks
2022/7/20
2022/7/22
Openvino Datawhale
配饰器模式
Customer first | domestic Bi leader, smart software completes round C financing
No routines, no traps, no advertisements | are you sure you don't need this free instant messaging software?
Add trust list
TZC 1283: 简单排序 —— 堆排序
阿里云如何将一个域名解析到另一个域名上
PyQt5_pyqtgraph鼠标在折线图上画线段
Accessory mode
Operator usage and scheduling process of 31 spark