19、Join操作map side join 和 reduce side join

文章目录

  • 一、join
    • 1、商品信息
    • 2、订单信息
  • 二、reduce side join
    • 1、主要步骤
    • 2、不足
    • 3、实现说明
    • 4、实现(未排序实现)
      • 1)、mapper
      • 2)、reducer
      • 3)、driver
      • 4)、验证
    • 5、实现(排序)
      • 1)、实现
      • 2)、验证
  • 三、map side join
    • 1、优势
    • 2、实现说明
    • 3、实现
      • 1)、mapper
      • 2)、reducer
      • 3)、driver
      • 4)、验证

本文介绍mapreduce的join操作。
本文前提是hadoop可以正常使用。
本文分为3个部分介绍,即join的介绍、map side join和reduce side join。

一、join

在使用MapReduce框架进行数据处理的过程中,也会涉及到从多个数据集读取数据,进行join关联的操作,此时需要根据MapReduce的编程规范进行业务的实现。
整个MapReduce的join分为两类:Map Side Join、Reduce Side Join。
具体详见下文。
以下示例中的数据结构如下:
有两个数据文件:goods(商品信息)、order_goods(订单信息)。
要求使用MapReduce统计出每笔订单中对应的具体的商品名称信息。
比如: 107860—>AMAZFIT黑色硅胶腕带

1、商品信息

商品ID |编号 |名称
100101|155083444927602|四川果冻橙6个约180g/个
100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g

2、订单信息

订单ID|商品ID|成交价格
11152|108464|76
11152|109995|1899

二、reduce side join

在reduce阶段执行join关联操作。通过shuffle过程就可以将相关的数据分到相同的分组中,基于此可进行join。
在这里插入图片描述

1、主要步骤

  • mapper分别读取不同的数据集
  • mapper的输出中,通常以join的字段作为输出的key
  • 不同数据集的数据经过shuffle,key一样的会被分到同一分组处理
  • 在reduce中根据业务需求把数据进行关联整合汇总,最终输出

2、不足

  • reduce端join最大的问题是整个join的工作是在reduce阶段完成的,但是通常情况下MapReduce中reduce的并行度是极小的(默认是1个),这就使得所有的数据都挤压到reduce阶段处理,压力颇大
  • 在数据从mapper到reducer的过程中,shuffle阶段十分繁琐,数据集大时成本极高

3、实现说明

  • 根据不同的输入文件名称在mapper中进行相应的处理,本示例是在遍历出的数据加上G#和O#,分别标识是商品信息的数据和订单信息的数据
  • 选择相关联字段作为Map输出的key,在reducer时可以确保相同key的数据可以分在同一个组内。将2组有共同属性的数据作为key(关联字段),即商品ID
  • 在reduce实现中,将具有相同key(商品ID)的数据分goods和order区分存储在不同的数据结构中,本示例中order数据放在List中,商品信息放在HashMap中。在输出遍历时,循环List,同时根据商品ID获取其名称和编号即可
  • 如果对输出的结果有排序要求,则需要将上一步的输出结果按照需要排序的字段作为key。本示例中是以订单ID作为排序ID

4、实现(未排序实现)

1)、mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ReducerSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
	// goods.txt
	// order.txt
	String sourceFileName = "";
	Text outValue = new Text();
	Text outKey = new Text();

	protected void setup(Context context) throws IOException, InterruptedException {
		// 获取当前处理的切片所属的文件名字
		FileSplit inputSplit = (FileSplit) context.getInputSplit();
		sourceFileName = inputSplit.getPath().getName();
		System.out.println("当前正在处理的文件是:" + sourceFileName);
	}

//	商品信息
//	商品ID |编号           |名称
//	100101|155083444927602|四川果冻橙6个约180g/个
//	100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
//	订单信息
//	订单ID |商品ID|成交价格
//	 11152|108464|76
//	 11152|109995|1899
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] line = value.toString().split("\|");	
		
		if ("goods.txt".equals(sourceFileName)) {// 商品信息
			Counter goodsCounter = context.getCounter("goods_records_counters", "goods Records");
			goodsCounter.increment(1);
			
			outKey.set(line[0]);
			outValue.set("G#" + value.toString());
		} else if ("order.txt".equals(sourceFileName)) {// 订单信息
			Counter orderCounter = context.getCounter("order_records_counters", "order Records");
			orderCounter.increment(1);
			
			outKey.set(line[1]);
			outValue.set("O#" + value.toString());
		}
		context.write(outKey, outValue);
		
	}

}

2)、reducer

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReducerSideJoinReducer extends Reducer<Text, Text, Text, NullWritable> {

	List<String> orderList = new ArrayList<String>();
	Map<String, String> goodsMap = new HashMap<String, String>();
	StringBuilder outValue = new StringBuilder();
	Text outKey = new Text();

	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//		商品信息
//		商品ID |编号                      |名称
//		100101|155083444927602|四川果冻橙6个约180g/个
//		100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
//		订单信息
//		订单ID|商品ID|成交价格
//		 11152|108464|76
//		 11152|109995|1899

		// 数据格式:
		// 商品信息
		// G#100101|155083444927602|四川果冻橙6个约180g/个
		// 订单信息
		// O#11152|108464|76

		for (Text value : values) {
			String temp = value.toString();
			if (temp.startsWith("G#")) {
				goodsMap.put(key.toString(), temp.split("G#")[1]);
			} else if (temp.startsWith("O#")) {
				orderList.add(temp.split("O#")[1]);
			}
		}

//		O#11152|108464|76
		// 正序排序,因为是按照商品ID进行分组过来的,所以此处排序不起作用
//		Collections.sort(orderList, new Comparator<String>() {
//			@Override
//			public int compare(String orderId, String orderIdNew) {
//
//				int distance = Integer.parseInt(orderId.split("\|")[0]) - Integer.parseInt(orderIdNew.split("\|")[0]);
//				System.out.println(orderId + "   orderIdNew=" + orderIdNew + "   " + distance);
//				if (distance > 0) {
//					return 1;
//				} else if (distance < 0) {
//					return -1;
//				}
//				return 0;
//			}
//		});

		// 输出数据格式
		// 订单编号|商品ID|商品编号|商品名称|成交价格
		for (int i = 0; i < orderList.size(); i++) {
			String order[] = orderList.get(i).split("\|");
//			if (order[0].equals("1") || order[0].equals("2") || order[0].equals("3")) { //少量数据测试结果正确性
			outValue.append(order[0]).append("|").append(goodsMap.get(key.toString())).append("|").append(order[2]);
			outKey.set(outValue.toString());
			context.write(outKey, NullWritable.get());
//			}
		}
		outValue.setLength(0);
		goodsMap.clear();
		orderList.clear();
	}

/**
* 验证reducer是否正确
**/
	public static void main(String[] args) {

		List<String> orderList = new ArrayList<String>();
		orderList.add("O#1|101524|891");
		orderList.add("O#1|111835|10080");
		orderList.add("O#1|107848|1734");
		orderList.add("O#2|113561|11192");
		orderList.add("O#11152|108464|76");

//		String temp = "O#11152|108464|76";
//		System.out.println(temp.split("\|")[0].split("O#")[1]);
		Collections.sort(orderList, new Comparator<String>() {

			@Override
			public int compare(String orderId, String orderIdNew) {

				int distance = Integer.parseInt(orderId.split("\|")[0].split("O#")[1])
						- Integer.parseInt(orderIdNew.split("\|")[0].split("O#")[1]);
				if (distance > 0) {
					return 1;
				} else if (distance < 0) {
					return -1;
				}
				return 0;
			}
		});
		System.out.print(orderList);
	}
}

3)、driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReducerSideJoinDriver extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/join";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new ReducerSideJoinDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		// 创建作业实例
		Job job = Job.getInstance(conf, "Reduce Side Join Testing  ");
		// 设置作业驱动类
		job.setJarByClass(ReducerSideJoinDriver.class);

		// 设置mapper相关信息LongWritable, Text, Text, Text
		job.setMapperClass(ReducerSideJoinMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		// 设置reducer相关信息 Text, Text, Text, NullWritable
		job.setReducerClass(ReducerSideJoinReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4)、验证

以下是验证数据输出结果
在这里插入图片描述

5、实现(排序)

按照订单号正序排序
本示例是在未排序的基础上做的,即以未排序的结果为输入

1)、实现

mapper、reducer、driver放在一起

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceSideSort extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new ReduceSideSort(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		// 创建作业实例
		Job job = Job.getInstance(conf, "Reduce Side Join Sort Testing  ");
		// 设置作业驱动类
		job.setJarByClass(ReduceSideSort.class);

		// 设置mapper相关信息LongWritable, Text, Text, Text
		job.setMapperClass(ReduceSideSortMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		// 设置reducer相关信息 Text, Text, Text, NullWritable
		job.setReducerClass(ReduceSideSortReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;

	}

	static class ReduceSideSortMapper extends Mapper<LongWritable, Text, Text, Text> {
// 未排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格
//		1|101524|100006391000|指爱花盒|891
//		6|101626|100006879264|爱科技N200NC|7995
//		522|101658|100007019896|花花公子休闲鞋|536

// 按照订单号正序排序,排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格
		Text outKey = new Text();
		Text outValue = new Text();

		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] line = value.toString().split("\|");
			outKey.set(line[0]);
			outValue.set(line[0] + "|" + line[1] + "|" + line[2] + "|" + line[3] + "|" + line[4]);
			context.write(outKey, outValue);
		}
	}

	static class ReduceSideSortReducer extends Reducer<Text, Text, Text, NullWritable> {
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values) {
				context.write(value, NullWritable.get());
			}
		}
	}
}

2)、验证

从结果看,已经按照订单号正序排序了,此处是利用mapreducer的默认按照key的排序规则
在这里插入图片描述

三、map side join

map side join,就是在map阶段执行join关联操作,并且程序通常没reduce阶段,避免了shuffle时候的繁琐。实现Map端join的关键是使用MapReduce的分布式缓存。
在这里插入图片描述

1、优势

  • 整个join的过程没有shuffle,没有reducer,减少shuffle时候的数据传输成本。并且mapper的并行度可以根据输入数据量自动调整,充分发挥分布式计算的优势

2、实现说明

  • 分析处理的数据集,使用分布式缓存技术将小的数据集进行分布式缓存
  • MapReduce框架在执行的时候会自动将缓存的数据分发到各个maptask运行的机器上
  • 在mapper初始化的时候从分布式缓存中读取小数据集数据,然后和自己读取的大数据集进行join关联,输出最终的结果

3、实现

该处实现与reduce side join实现的功能是一样的,数据结构也是一样的,故不再赘述。

1)、mapper

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
	Map<String, String> goodsMap = new HashMap<String, String>();
	Text outKey = new Text();

	protected void setup(Context context) throws IOException, InterruptedException {
		// 读取缓存文件 千万别写成/goods.txt否则会提示找不到该文件
		BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("goods.txt")));
		String line = null;
//		商品信息
//		商品ID |编号           |名称
//		100101|155083444927602|四川果冻橙6个约180g/个
//		100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
		while ((line = br.readLine()) != null) {
			String[] fields = line.split("\|");
			goodsMap.put(fields[0], fields[1] + "|" + fields[2]);
		}
	}

//	商品信息
//	商品ID |编号                      |名称
//	100101|155083444927602|四川果冻橙6个约180g/个
//	100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
//	订单信息
//	订单ID|商品ID|成交价格
//	 11152|108464|76
//	 11152|109995|1899
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] line = value.toString().split("\|");
		outKey.set(line[0] + "|" + line[1] + "|" + goodsMap.get(line[1]) + "|" + line[2]);
		context.write(outKey, NullWritable.get());
	}
}

2)、reducer

3)、driver

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DistributedCacheDriver extends Configured implements Tool {
//执行命令:
//yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new DistributedCacheDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		// 创建作业实例
		Job job = Job.getInstance(conf, "Map Side Join Testing  DistributedCacheDriver");
		// 设置作业驱动类
		job.setJarByClass(DistributedCacheDriver.class);

		// 设置mapper相关信息LongWritable, Text, Text, NullWritable
		job.setMapperClass(DistributedCacheMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setNumReduceTasks(0);

		// 添加分布式缓存文件
		job.addCacheFile(new URI(args[2]));

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(args[1]))) {
			fs.delete(new Path(args[1]), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4)、验证

该示例需要在hadoop环境中执行。

创建好相关的文件夹以及上传文件

  • 待处理文件(输入文件路径):/distributedcache/in
  • 缓存文件:/distributedcache/cachefiles/goods.txt
  • 输出文件路径:/distributedcache/out
# 执行命令:
yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt

#运行日志
[alanchan@server4 testMR]$ yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
2022-09-22 03:00:27,021 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/alanchan/.staging/job_1663661108338_0025
2022-09-22 03:00:32,826 INFO input.FileInputFormat: Total input files to process : 1
2022-09-22 03:00:33,008 INFO mapreduce.JobSubmitter: number of splits:1
2022-09-22 03:00:33,179 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1663661108338_0025
2022-09-22 03:00:33,180 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-09-22 03:00:33,325 INFO conf.Configuration: resource-types.xml not found
2022-09-22 03:00:33,325 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-09-22 03:00:33,374 INFO impl.YarnClientImpl: Submitted application application_1663661108338_0025
2022-09-22 03:00:33,408 INFO mapreduce.Job: The url to track the job: http://server1:8088/proxy/application_1663661108338_0025/
2022-09-22 03:00:33,409 INFO mapreduce.Job: Running job: job_1663661108338_0025
2022-09-22 03:00:40,480 INFO mapreduce.Job: Job job_1663661108338_0025 running in uber mode : false
2022-09-22 03:00:40,481 INFO mapreduce.Job:  map 0% reduce 0%
2022-09-22 03:00:46,523 INFO mapreduce.Job:  map 100% reduce 0%
2022-09-22 03:00:47,531 INFO mapreduce.Job: Job job_1663661108338_0025 completed successfully
2022-09-22 03:00:47,607 INFO mapreduce.Job: Counters: 32
        File System Counters
                FILE: Number of bytes read=0
                FILE: Number of bytes written=226398
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1272
                HDFS: Number of bytes written=10882
                HDFS: Number of read operations=7
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=8080
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=4040
                Total vcore-milliseconds taken by all map tasks=4040
                Total megabyte-milliseconds taken by all map tasks=41369600
        Map-Reduce Framework
                Map input records=76
                Map output records=76
                Input split bytes=117
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=35
                CPU time spent (ms)=650
                Physical memory (bytes) snapshot=205320192
                Virtual memory (bytes) snapshot=7332417536
                Total committed heap usage (bytes)=193462272
                Peak Map Physical memory (bytes)=205320192
                Peak Map Virtual memory (bytes)=7332417536
        File Input Format Counters 
                Bytes Read=1155
        File Output Format Counters 
                Bytes Written=10882

在这里插入图片描述
至此介绍完mapreduce的join两种实现方式,其中map端的join用到了分布式缓存。