大数据处理MapReduce案例实践

相关文章

【大数据处理】 WordCount案例实践

源数据:Child--Parent

 根据Child--Parent表推断grandchild和grandparent

                           左表                                                                             右表

               

将一张表分解为两张表的连接:从图中可以找出Tom的grandparent为Marry和Ben,同理可以找出其他的人的grandparent

思路与步骤:

只有连接 左表的parent列和右表的child列,才能得到grandchild和grandparent的信息。

因此需要将源数据的一张表拆分成两张表,且左表和右表是同一个表,如上图。

  • 所以在map阶段将读入数据分割成child和parent之后,将parent设置成key,child设置成value进行输出,并作为左表;
  • 再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。
  • 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。
  • 这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。
  • reduce接收到连接的结果,其中每个key的value-list就包含了"grandchild--grandparent"关系。
  • 取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,
  • 最后对两个数组求笛卡尔积得到最后的结果

代码:

package Mapreduce;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SingleTableLink {
    private static int time = 0;

    public static void main(String[] args) throws Exception {
        //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
        //2将自定义的MyMapper和MyReducer组装在一起
        Configuration conf=new Configuration();
        String jobName=SingleTableLink.class.getSimpleName();
        //1首先寫job,知道需要conf和jobname在去創建即可
        Job job = Job.getInstance(conf, jobName);

        //*13最后,如果要打包运行改程序,则需要调用如下行
        job.setJarByClass(SingleTableLink.class);

        //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //4指定解析<k1,v1>的类(谁来解析键值对)
        //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
        job.setInputFormatClass(TextInputFormat.class);
        //5指定自定义mapper类
        job.setMapperClass(MyMapper.class);
        //6指定map输出的key2的类型和value2的类型  <k2,v2>
        //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //7分区(默认1个),排序,分组,规约 采用 默认

        //接下来采用reduce步骤
        //8指定自定义的reduce类
        job.setReducerClass(MyReducer.class);
        //9指定输出的<k3,v3>类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //10指定输出<K3,V3>的类
        //*下面这一步可以省
        job.setOutputFormatClass(TextOutputFormat.class);
        //11指定输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //12写的mapreduce程序要交给resource manager运行
        job.waitForCompletion(true);
    }

    private static class MyMapper extends Mapper<Object, Text, Text, Text> {
        @Override
        protected void map(Object k1, Text v1,
                Mapper<Object, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String childName = new String();
            String parentName = new String();
            String relationType = new String();
            Text k2 = new Text();
            Text v2 = new Text();
            // 輸入一行预处理的文本
            StringTokenizer items = new StringTokenizer(v1.toString());
            String[] values = new String[2];
            int i = 0;
            while (items.hasMoreTokens()) {
                values[i] = items.nextToken();
                i++;
            }
            if (values[0].compareTo("child") != 0) {
                childName = values[0];
                parentName = values[1];
                // 输出左表,左表加1的标识
                relationType = "1";
                k2 = new Text(values[1]); // parent作为key,作为表1的key
                v2 = new Text(relationType + "+" + childName + "+" + parentName);//<1+Lucy+Tom>
                context.write(k2, v2);
                // 输出右表,右表加2的标识
                relationType = "2";
                k2 = new Text(values[0]);// child作为key,作为表2的key
                v2 = new Text(relationType + "+" + childName + "+" + parentName);//<2+Jone+Lucy>
                context.write(k2, v2);
            }
        }
    }

    private static class MyReducer extends Reducer<Text, Text, Text, Text> {
        Text k3 = new Text();
        Text v3 = new Text();

        @Override
        protected void reduce(Text k2, Iterable<Text> v2s,
                Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            if (0 == time) {
                context.write(new Text("grandchild"), new Text("grandparent"));
                time++;
            }
            int grandchildnum = 0;
            String[] grandchild = new String[10];//孙子
            int grandparentnum = 0;
            String[] grandparent = new String[10];//爷爷
            Iterator items = v2s.iterator();//["1 Tom","2 Mary","2 Ben"]
            while (items.hasNext()) {
                String record = items.next().toString();
                int len = record.length();
                int i = 2;
                if (0 == len) {
                    continue;
                }

                // 取得左右表的标识
                char relationType = record.charAt(0);
                // 定义孩子和父母变量
                String childname = new String();
                String parentname = new String();
                // 获取value列表中value的child
                while (record.charAt(i) != '+') {
                    childname += record.charAt(i);
                    i++;
                }
                i = i + 1; //越过名字之间的“+”加号
                // 获取value列表中value的parent
                while (i < len) {
                    parentname += record.charAt(i);
                    i++;
                }
                // 左表,取出child放入grandchildren
                if ('1' == relationType) {
                    grandchild[grandchildnum] = childname;
                    grandchildnum++;
                }
                // 右表,取出parent放入grandparent
                if ('2' == relationType) {
                    grandparent[grandparentnum] = parentname;
                    grandparentnum++;
                }
            }
            // grandchild和grandparentnum数组求笛卡尔积
            if (0 != grandchildnum && 0 != grandparentnum) {
                for (int i = 0; i < grandchildnum; i++) {
                    for (int j = 0; j < grandparentnum; j++) {
                        k3 = new Text(grandchild[i]);
                        v3 = new Text(grandparent[j]);
                        context.write(k3, v3);
                    }
                }
            }
        }
    }

}

代码运行:

(1)准备数据

# vi child_parent 

Tom Lucy
Tom Jack
Jone Lucy
Jone ack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesses
Terry Alice
Terry Jesses
Philip Terry
Philip Alma
Mark Terry
Mark Alma

将数据child_parent上传到hdfs的新建data1目录中

# hdfs dfs -put child_parent /data1

(2)执行jar包

# hadoop jar SingleTableLink.jar SingleTableLink  /data1/child_parent  /out1345

(3)查看运行结果是否正确

[root@neusoft-master filecontent]# hdfs dfs –text /out1345/part-r-00000