当前位置:网站首页>MapReduce implements matrix multiplication - implementation code
MapReduce implements matrix multiplication - implementation code
2022-07-03 13:22:00 【Brother Xing plays with the clouds】
I wrote an analysis before MapReduce Implementation of matrix multiplication algorithm article :Mapreduce The algorithm of matrix multiplication http://www.linuxidc.com/Linux/2014-09/106646.htm
In order to make you more intuitive understanding of program execution , Today, I wrote the implementation code for your reference .
Programming environment :
- java version "1.7.0_40"
- Eclipse Kepler
- Windows7 x64
- Ubuntu 12.04 LTS
- Hadoop2.2.0
- Vmware 9.0.0 build-812388
input data :
A Matrix storage address :hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa
A Matrix content : 3 4 6 4 0 8
B Matrix storage address :hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb
B Matrix content : 2 3 3 0 4 1
Implementation code :
There are three classes :
- Drive class MMDriver
- Map class MMMapper
- Reduce class MMReducer
You can combine them into one class according to your personal habits .
MMDriver.java
package dataguru.matrixmultiply;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MMDriver { public static void main(String[] args) throws Exception { // set configuration Configuration conf = new Configuration();
// create job Job job = new Job(conf,"MatrixMultiply"); job.setJarByClass(dataguru.matrixmultiply.MMDriver.class); // specify Mapper & Reducer job.setMapperClass(dataguru.matrixmultiply.MMMapper.class); job.setReducerClass(dataguru.matrixmultiply.MMReducer.class); // specify output types of mapper and reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // specify input and output DIRECTORIES Path inPathA = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA"); Path inPathB = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB"); Path outPath = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC"); FileInputFormat.addInputPath(job, inPathA); FileInputFormat.addInputPath(job, inPathB); FileOutputFormat.setOutputPath(job,outPath);
// delete output directory try{ FileSystem hdfs = outPath.getFileSystem(conf); if(hdfs.exists(outPath)) hdfs.delete(outPath); hdfs.close(); } catch (Exception e){ e.printStackTrace(); return ; } // run the job System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MMMapper.java
package dataguru.matrixmultiply;
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MMMapper extends Mapper<Object, Text, Text, Text> { private String tag; //current matrix private int crow = 2;// matrix A The number of rows private int ccol = 2;// matrix B Columns of private static int arow = 0; //current arow private static int brow = 0; //current brow @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO get inputpath of input data, set to tag FileSplit fs = (FileSplit)context.getInputSplit(); tag = fs.getPath().getParent().getName(); }
/** * input data include two matrix files */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer str = new StringTokenizer(value.toString()); if ("matrixA".equals(tag)) { //left matrix,output key:x,y int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); //current x,y = line,col for (int i = 0; i < ccol; i++) { Text outkey = new Text(arow+","+i); Text outvalue = new Text("a,"+col+","+item); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); } col++; } arow++; }else if ("matrixB".equals(tag)) { int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); //current x,y = line,col for (int i = 0; i < crow; i++) { Text outkey = new Text(i+","+col); Text outvalue = new Text("b,"+brow+","+item); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); } col++; } brow++; } } }
MMReducer.java
package dataguru.matrixmultiply;
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context;
public class MMReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String,String> matrixa = new HashMap<String,String>(); Map<String,String> matrixb = new HashMap<String,String>(); for (Text val : values) { //values example : b,0,2 or a,0,4 StringTokenizer str = new StringTokenizer(val.toString(),","); String sourceMatrix = str.nextToken(); if ("a".equals(sourceMatrix)) { matrixa.put(str.nextToken(), str.nextToken()); //(0,4) } if ("b".equals(sourceMatrix)) { matrixb.put(str.nextToken(), str.nextToken()); //(0,2) } } int result = 0; Iterator<String> iter = matrixa.keySet().iterator(); while (iter.hasNext()) { String mapkey = iter.next(); result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey)); }
context.write(key, new Text(String.valueOf(result))); } }
边栏推荐
- Flink SQL knows why (17): Zeppelin, a sharp tool for developing Flink SQL
- 【R】 [density clustering, hierarchical clustering, expectation maximization clustering]
- [Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter III exercises]
- Tutoriel PowerPoint, comment enregistrer une présentation sous forme de vidéo dans Powerpoint?
- The reasons why there are so many programming languages in programming internal skills
- Solve system has not been booted with SYSTEMd as init system (PID 1) Can‘t operate.
- MySQL
- Comprehensive evaluation of double chain notes remnote: fast input, PDF reading, interval repetition / memory
- The shortage of graphics cards finally came to an end: 3070ti for more than 4000 yuan, 2000 yuan cheaper than the original price, and 3090ti
- IDEA 全文搜索快捷键Ctr+Shift+F失效问题
猜你喜欢

Detailed explanation of multithreading

Flink SQL knows why (VIII): the wonderful way to parse Flink SQL tumble window

物联网毕设 --(STM32f407连接云平台检测数据)

Kivy教程之 盒子布局 BoxLayout将子项排列在垂直或水平框中(教程含源码)

Finite State Machine FSM

OpenHarmony应用开发之ETS开发方式中的Image组件

人身变声器的原理

Flink SQL knows why (13): is it difficult to join streams? (next)

我的创作纪念日:五周年

已解决TypeError: Argument ‘parser‘ has incorrect type (expected lxml.etree._BaseParser, got type)
随机推荐
(first) the most complete way to become God of Flink SQL in history (full text 180000 words, 138 cases, 42 pictures)
Server coding bug
When we are doing flow batch integration, what are we doing?
February 14, 2022, incluxdb survey - mind map
双链笔记 RemNote 综合评测:快速输入、PDF 阅读、间隔重复/记忆
mysql更新时条件为一查询
My creation anniversary: the fifth anniversary
2022-02-13 plan for next week
Spark实战1:单节点本地模式搭建Spark运行环境
2022-02-11 heap sorting and recursion
父亲和篮球
35道MySQL面试必问题图解,这样也太好理解了吧
MySQL constraints
剑指 Offer 15. 二进制中1的个数
The shortage of graphics cards finally came to an end: 3070ti for more than 4000 yuan, 2000 yuan cheaper than the original price, and 3090ti
2022-02-14 incluxdb cluster write data writetoshard parsing
Will Huawei be the next one to fall
Libuv库 - 设计概述(中文版)
Sitescms v3.1.0 release, launch wechat applet
已解决TypeError: Argument ‘parser‘ has incorrect type (expected lxml.etree._BaseParser, got type)