文章目录
- 一、join
-
- 1、商品信息
- 2、订单信息
- 二、reduce side join
-
- 1、主要步骤
- 2、不足
- 3、实现说明
- 4、实现(未排序实现)
-
- 1)、mapper
- 2)、reducer
- 3)、driver
- 4)、验证
- 5、实现(排序)
-
- 1)、实现
- 2)、验证
- 三、map side join
-
- 1、优势
- 2、实现说明
- 3、实现
-
- 1)、mapper
- 2)、reducer
- 3)、driver
- 4)、验证
本文介绍mapreduce的join操作。
本文前提是hadoop可以正常使用。
本文分为3个部分介绍,即join的介绍、map side join和reduce side join。
一、join
在使用MapReduce框架进行数据处理的过程中,也会涉及到从多个数据集读取数据,进行join关联的操作,此时需要根据MapReduce的编程规范进行业务的实现。
整个MapReduce的join分为两类:Map Side Join、Reduce Side Join。
具体详见下文。
以下示例中的数据结构如下:
有两个数据文件:goods(商品信息)、order_goods(订单信息)。
要求使用MapReduce统计出每笔订单中对应的具体的商品名称信息。
比如: 107860—>AMAZFIT黑色硅胶腕带
1、商品信息
商品ID |编号 |名称
100101|155083444927602|四川果冻橙6个约180g/个
100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
2、订单信息
订单ID|商品ID|成交价格
11152|108464|76
11152|109995|1899
二、reduce side join
在reduce阶段执行join关联操作。通过shuffle过程就可以将相关的数据分到相同的分组中,基于此可进行join。
1、主要步骤
- mapper分别读取不同的数据集
- mapper的输出中,通常以join的字段作为输出的key
- 不同数据集的数据经过shuffle,key一样的会被分到同一分组处理
- 在reduce中根据业务需求把数据进行关联整合汇总,最终输出
2、不足
- reduce端join最大的问题是整个join的工作是在reduce阶段完成的,但是通常情况下MapReduce中reduce的并行度是极小的(默认是1个),这就使得所有的数据都挤压到reduce阶段处理,压力颇大
- 在数据从mapper到reducer的过程中,shuffle阶段十分繁琐,数据集大时成本极高
3、实现说明
- 根据不同的输入文件名称在mapper中进行相应的处理,本示例是在遍历出的数据加上G#和O#,分别标识是商品信息的数据和订单信息的数据
- 选择相关联字段作为Map输出的key,在reducer时可以确保相同key的数据可以分在同一个组内。将2组有共同属性的数据作为key(关联字段),即商品ID
- 在reduce实现中,将具有相同key(商品ID)的数据分goods和order区分存储在不同的数据结构中,本示例中order数据放在List中,商品信息放在HashMap中。在输出遍历时,循环List,同时根据商品ID获取其名称和编号即可
- 如果对输出的结果有排序要求,则需要将上一步的输出结果按照需要排序的字段作为key。本示例中是以订单ID作为排序ID
4、实现(未排序实现)
1)、mapper
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class ReducerSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> { // goods.txt // order.txt String sourceFileName = ""; Text outValue = new Text(); Text outKey = new Text(); protected void setup(Context context) throws IOException, InterruptedException { // 获取当前处理的切片所属的文件名字 FileSplit inputSplit = (FileSplit) context.getInputSplit(); sourceFileName = inputSplit.getPath().getName(); System.out.println("当前正在处理的文件是:" + sourceFileName); } // 商品信息 // 商品ID |编号 |名称 // 100101|155083444927602|四川果冻橙6个约180g/个 // 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g // 订单信息 // 订单ID |商品ID|成交价格 // 11152|108464|76 // 11152|109995|1899 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\|"); if ("goods.txt".equals(sourceFileName)) {// 商品信息 Counter goodsCounter = context.getCounter("goods_records_counters", "goods Records"); goodsCounter.increment(1); outKey.set(line[0]); outValue.set("G#" + value.toString()); } else if ("order.txt".equals(sourceFileName)) {// 订单信息 Counter orderCounter = context.getCounter("order_records_counters", "order Records"); orderCounter.increment(1); outKey.set(line[1]); outValue.set("O#" + value.toString()); } context.write(outKey, outValue); } }
2)、reducer
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ReducerSideJoinReducer extends Reducer<Text, Text, Text, NullWritable> { List<String> orderList = new ArrayList<String>(); Map<String, String> goodsMap = new HashMap<String, String>(); StringBuilder outValue = new StringBuilder(); Text outKey = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 商品信息 // 商品ID |编号 |名称 // 100101|155083444927602|四川果冻橙6个约180g/个 // 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g // 订单信息 // 订单ID|商品ID|成交价格 // 11152|108464|76 // 11152|109995|1899 // 数据格式: // 商品信息 // G#100101|155083444927602|四川果冻橙6个约180g/个 // 订单信息 // O#11152|108464|76 for (Text value : values) { String temp = value.toString(); if (temp.startsWith("G#")) { goodsMap.put(key.toString(), temp.split("G#")[1]); } else if (temp.startsWith("O#")) { orderList.add(temp.split("O#")[1]); } } // O#11152|108464|76 // 正序排序,因为是按照商品ID进行分组过来的,所以此处排序不起作用 // Collections.sort(orderList, new Comparator<String>() { // @Override // public int compare(String orderId, String orderIdNew) { // // int distance = Integer.parseInt(orderId.split("\|")[0]) - Integer.parseInt(orderIdNew.split("\|")[0]); // System.out.println(orderId + " orderIdNew=" + orderIdNew + " " + distance); // if (distance > 0) { // return 1; // } else if (distance < 0) { // return -1; // } // return 0; // } // }); // 输出数据格式 // 订单编号|商品ID|商品编号|商品名称|成交价格 for (int i = 0; i < orderList.size(); i++) { String order[] = orderList.get(i).split("\|"); // if (order[0].equals("1") || order[0].equals("2") || order[0].equals("3")) { //少量数据测试结果正确性 outValue.append(order[0]).append("|").append(goodsMap.get(key.toString())).append("|").append(order[2]); outKey.set(outValue.toString()); context.write(outKey, NullWritable.get()); // } } outValue.setLength(0); goodsMap.clear(); orderList.clear(); } /** * 验证reducer是否正确 **/ public static void main(String[] args) { List<String> orderList = new ArrayList<String>(); orderList.add("O#1|101524|891"); orderList.add("O#1|111835|10080"); orderList.add("O#1|107848|1734"); orderList.add("O#2|113561|11192"); orderList.add("O#11152|108464|76"); // String temp = "O#11152|108464|76"; // System.out.println(temp.split("\|")[0].split("O#")[1]); Collections.sort(orderList, new Comparator<String>() { @Override public int compare(String orderId, String orderIdNew) { int distance = Integer.parseInt(orderId.split("\|")[0].split("O#")[1]) - Integer.parseInt(orderIdNew.split("\|")[0].split("O#")[1]); if (distance > 0) { return 1; } else if (distance < 0) { return -1; } return 0; } }); System.out.print(orderList); } }
3)、driver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReducerSideJoinDriver extends Configured implements Tool { static String in = "D:/workspace/bigdata-component/hadoop/test/in/join"; static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new ReducerSideJoinDriver(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); // 创建作业实例 Job job = Job.getInstance(conf, "Reduce Side Join Testing "); // 设置作业驱动类 job.setJarByClass(ReducerSideJoinDriver.class); // 设置mapper相关信息LongWritable, Text, Text, Text job.setMapperClass(ReducerSideJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置reducer相关信息 Text, Text, Text, NullWritable job.setReducerClass(ReducerSideJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置输入的文件的路径 FileInputFormat.setInputPaths(job, new Path(in)); FileSystem fs = FileSystem.get(getConf()); if (fs.exists(new Path(out))) { fs.delete(new Path(out), true); } FileOutputFormat.setOutputPath(job, new Path(out)); return job.waitForCompletion(true) ? 0 : 1; } }
4)、验证
以下是验证数据输出结果
5、实现(排序)
按照订单号正序排序
本示例是在未排序的基础上做的,即以未排序的结果为输入
1)、实现
mapper、reducer、driver放在一起
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceSideSort extends Configured implements Tool { static String in = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join"; static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new ReduceSideSort(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); // 创建作业实例 Job job = Job.getInstance(conf, "Reduce Side Join Sort Testing "); // 设置作业驱动类 job.setJarByClass(ReduceSideSort.class); // 设置mapper相关信息LongWritable, Text, Text, Text job.setMapperClass(ReduceSideSortMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置reducer相关信息 Text, Text, Text, NullWritable job.setReducerClass(ReduceSideSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置输入的文件的路径 FileInputFormat.setInputPaths(job, new Path(in)); FileSystem fs = FileSystem.get(getConf()); if (fs.exists(new Path(out))) { fs.delete(new Path(out), true); } FileOutputFormat.setOutputPath(job, new Path(out)); return job.waitForCompletion(true) ? 0 : 1; } static class ReduceSideSortMapper extends Mapper<LongWritable, Text, Text, Text> { // 未排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格 // 1|101524|100006391000|指爱花盒|891 // 6|101626|100006879264|爱科技N200NC|7995 // 522|101658|100007019896|花花公子休闲鞋|536 // 按照订单号正序排序,排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格 Text outKey = new Text(); Text outValue = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\|"); outKey.set(line[0]); outValue.set(line[0] + "|" + line[1] + "|" + line[2] + "|" + line[3] + "|" + line[4]); context.write(outKey, outValue); } } static class ReduceSideSortReducer extends Reducer<Text, Text, Text, NullWritable> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, NullWritable.get()); } } } }
2)、验证
从结果看,已经按照订单号正序排序了,此处是利用mapreducer的默认按照key的排序规则
三、map side join
map side join,就是在map阶段执行join关联操作,并且程序通常没reduce阶段,避免了shuffle时候的繁琐。实现Map端join的关键是使用MapReduce的分布式缓存。
1、优势
- 整个join的过程没有shuffle,没有reducer,减少shuffle时候的数据传输成本。并且mapper的并行度可以根据输入数据量自动调整,充分发挥分布式计算的优势
2、实现说明
- 分析处理的数据集,使用分布式缓存技术将小的数据集进行分布式缓存
- MapReduce框架在执行的时候会自动将缓存的数据分发到各个maptask运行的机器上
- 在mapper初始化的时候从分布式缓存中读取小数据集数据,然后和自己读取的大数据集进行join关联,输出最终的结果
3、实现
该处实现与reduce side join实现的功能是一样的,数据结构也是一样的,故不再赘述。
1)、mapper
import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Map<String, String> goodsMap = new HashMap<String, String>(); Text outKey = new Text(); protected void setup(Context context) throws IOException, InterruptedException { // 读取缓存文件 千万别写成/goods.txt否则会提示找不到该文件 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("goods.txt"))); String line = null; // 商品信息 // 商品ID |编号 |名称 // 100101|155083444927602|四川果冻橙6个约180g/个 // 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g while ((line = br.readLine()) != null) { String[] fields = line.split("\|"); goodsMap.put(fields[0], fields[1] + "|" + fields[2]); } } // 商品信息 // 商品ID |编号 |名称 // 100101|155083444927602|四川果冻橙6个约180g/个 // 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g // 订单信息 // 订单ID|商品ID|成交价格 // 11152|108464|76 // 11152|109995|1899 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\|"); outKey.set(line[0] + "|" + line[1] + "|" + goodsMap.get(line[1]) + "|" + line[2]); context.write(outKey, NullWritable.get()); } }
2)、reducer
无
3)、driver
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DistributedCacheDriver extends Configured implements Tool { //执行命令: //yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new DistributedCacheDriver(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); // 创建作业实例 Job job = Job.getInstance(conf, "Map Side Join Testing DistributedCacheDriver"); // 设置作业驱动类 job.setJarByClass(DistributedCacheDriver.class); // 设置mapper相关信息LongWritable, Text, Text, NullWritable job.setMapperClass(DistributedCacheMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0); // 添加分布式缓存文件 job.addCacheFile(new URI(args[2])); // 设置输入的文件的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileSystem fs = FileSystem.get(getConf()); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
4)、验证
该示例需要在hadoop环境中执行。
创建好相关的文件夹以及上传文件
- 待处理文件(输入文件路径):/distributedcache/in
- 缓存文件:/distributedcache/cachefiles/goods.txt
- 输出文件路径:/distributedcache/out
# 执行命令: yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt #运行日志 [alanchan@server4 testMR]$ yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt 2022-09-22 03:00:27,021 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/alanchan/.staging/job_1663661108338_0025 2022-09-22 03:00:32,826 INFO input.FileInputFormat: Total input files to process : 1 2022-09-22 03:00:33,008 INFO mapreduce.JobSubmitter: number of splits:1 2022-09-22 03:00:33,179 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1663661108338_0025 2022-09-22 03:00:33,180 INFO mapreduce.JobSubmitter: Executing with tokens: [] 2022-09-22 03:00:33,325 INFO conf.Configuration: resource-types.xml not found 2022-09-22 03:00:33,325 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 2022-09-22 03:00:33,374 INFO impl.YarnClientImpl: Submitted application application_1663661108338_0025 2022-09-22 03:00:33,408 INFO mapreduce.Job: The url to track the job: http://server1:8088/proxy/application_1663661108338_0025/ 2022-09-22 03:00:33,409 INFO mapreduce.Job: Running job: job_1663661108338_0025 2022-09-22 03:00:40,480 INFO mapreduce.Job: Job job_1663661108338_0025 running in uber mode : false 2022-09-22 03:00:40,481 INFO mapreduce.Job: map 0% reduce 0% 2022-09-22 03:00:46,523 INFO mapreduce.Job: map 100% reduce 0% 2022-09-22 03:00:47,531 INFO mapreduce.Job: Job job_1663661108338_0025 completed successfully 2022-09-22 03:00:47,607 INFO mapreduce.Job: Counters: 32 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=226398 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1272 HDFS: Number of bytes written=10882 HDFS: Number of read operations=7 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=8080 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=4040 Total vcore-milliseconds taken by all map tasks=4040 Total megabyte-milliseconds taken by all map tasks=41369600 Map-Reduce Framework Map input records=76 Map output records=76 Input split bytes=117 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=35 CPU time spent (ms)=650 Physical memory (bytes) snapshot=205320192 Virtual memory (bytes) snapshot=7332417536 Total committed heap usage (bytes)=193462272 Peak Map Physical memory (bytes)=205320192 Peak Map Virtual memory (bytes)=7332417536 File Input Format Counters Bytes Read=1155 File Output Format Counters Bytes Written=10882
至此介绍完mapreduce的join两种实现方式,其中map端的join用到了分布式缓存。