Hadoop Map Reduce 案例:好友推荐

创作人 Leo


编辑时间 Wed Jan 1,2020 at 10:13


Hadoop Map Reduce 案例:好友推荐

好友推荐是社交网站的基础模块

当数据量大后好友推荐运算量会非常大

通过分布式计算可以轻松实现低延迟好友推荐系统

好友推荐的mapreduce需要两个job

job1 负责找出用户间的FOF关系(friend of friend)

job2 负责找出各用户间的关系度,并生成最终推荐列表

job1 只需要定义mapper和reducer即可

job2 要相对复杂一些,

由于一组关系数据最终要对应到一个用户上,所以需要自定义分组,

另外,推荐列表需要按照关系度排序,这就要自定义排序

完整代码示例:

定义Fof模型,可以方便管理Fof关系数据

Fof.java

package lx;
import org.apache.hadoop.io.Text;
public class Fof extends Text {
    public Fof(){
        super() ;
    }
     
    /**
     * fof 关系类
     * Text格式为 小明    小红 ,方便直接写入中间结果以便于后续解析
     */
    public Fof(String a, String b){
        super(getFof(a,b));
    }
     
    public static String getFof(String a, String b){
         
        int r = a.compareTo(b) ;
         
        if(r>0){
            return a+" "+b ;
        }else{
            return b+" "+a ;
        }
         
         
    }
     
}

定义一个用户模型,自定义排序和自定义分组都需要根据键来处理,利用这个User模型就可以实现

另外这个用户模型也可以保存推荐用户的一些信息,比如关系度

User.java

package lx;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class User implements WritableComparable<User>{
    private String username;
    private int fofCounter ;
     
    public User(){
        //不定义这个空参数的构造函数会报错
    }
     
    /**
     * 一个代表用户的模型类
     * 
     * @param _username 用户名字(分组用)
     * @param _fofCounter 用户的关系度(排序用)
     */
    public User(String _username, int _fofCounter){
        this.username = _username ;
        this.fofCounter = _fofCounter ;
    }
     
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public int getFofCounter() {
        return fofCounter;
    }
    public void setFofCounter(int fofCounter) {
        this.fofCounter = fofCounter;
    }
     
    //实现接口:反序列化必备
    public void readFields(DataInput arg0) throws IOException {
        // TODO Auto-generated method stub
        this.username = new String(arg0.readUTF());
        this.fofCounter = arg0.readInt();
         
    }
     
    //实现接口:序列化必备
    public void write(DataOutput arg0) throws IOException {
        // TODO Auto-generated method stub
        arg0.writeUTF(this.username);
        arg0.writeInt(this.fofCounter);
         
    }
     
    //实现接口:判断两个对象是不是同一个对象
    public int compareTo(User o) {
        // TODO Auto-generated method stub
         
         
        int r = this.username.compareTo(o.getUsername()) ;
         
        if(r==0){
            return Integer.compare(this.fofCounter,o.getFofCounter());
        }
         
        return r;
         
    }
     
     
     
     
     
}

自定义分组类

UserGroup.java

package lx;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class UserGroup extends WritableComparator {
     
    /**
     * 复写父类函数
     * 通过用户名字判断是否为一组数据
     */
    public int compare(WritableComparable a, WritableComparable b) {
         
        //第二次mapred 需要按照key User的name分组
         
        User k1 = (User) a;
        User k2 = (User) b;
         
         
        int r1 = k1.getUsername().compareTo(k2.getUsername());
         
        return r1 ;
    }
    public UserGroup (){
        //向上层注册需要比对的类
        super(User.class, true) ;
    }
     
}

自定义排序类

UserSort.java

package lx;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class UserSort extends WritableComparator {
    /**
     * 复写父类函数
     * 通过关系度排序
     * (名字也不能忽略,如果名字排序不进行处理,自定义分组会失效)
     */
    public int compare(WritableComparable a, WritableComparable b) {
         
        User k1 = (User) a;
        User k2 = (User) b;
         
        int r = k1.getUsername().compareTo(k2.getUsername());
         
        if (r==0){
            return  -Integer.compare(k1.getFofCounter() , k2.getFofCounter()) ;
        }
         
        return r ;
    }
    public UserSort(){
        super(User.class, true) ;
    }
}

程序主体

包含了两个job的执行和Mapper、Reducer

RunJob.java

package lx;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
    public static void main(String[] args){
         
        Configuration config = new Configuration() ;
        //这个程序需要执行两个子任务
        //任务1负责找出所有存在的FOF(friend of friend)关系
        //任务2负责对fof关系进行排序,并生成最终结果,一个用户对应一个按照关系度倒序的推荐列表
        if(RunJob.run1(config)){
            RunJob.run2(config);
        }
    }
     
    public static void run2(Configuration config){
        try{
            FileSystem fs = FileSystem.get(config) ;
             
            Job job = Job.getInstance();
            job.setJarByClass(RunJob.class);
            job.setJobName("fofuser");
             
            //设置MapReduce相关类和键值类型
            job.setMapperClass(UserMapper.class);
            job.setReducerClass(UserReducer.class);
            job.setGroupingComparatorClass(UserGroup.class);
            job.setSortComparatorClass(UserSort.class);
            job.setMapOutputKeyClass(User.class);
            job.setMapOutputValueClass(User.class);
             
            //KeyValueTextInputFormat 默认分隔符是制表符
            //可以通过 config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); 修改
            job.setInputFormatClass(KeyValueTextInputFormat.class);
             
            FileInputFormat.addInputPath(job, new Path("/output/friend/"));
            Path outputPath = new Path("/output/friend2");
             
            if(fs.exists(outputPath)){
                fs.delete(outputPath, true) ;
            }
             
            FileOutputFormat.setOutputPath(job, outputPath);
             
            boolean f = job.waitForCompletion(true) ;
             
            if(f){
                System.out.println("run2 job is done");
                 
            }
             
        }catch(Exception e){
            e.printStackTrace();
        }
         
    }
     
    public static boolean run1(Configuration config){
        try{
            FileSystem fs = FileSystem.get(config) ;
             
            Job job = Job.getInstance();
            job.setJarByClass(RunJob.class);
            job.setJobName("fof");
             
            //设置MapReduce相关类和键值类型
            job.setMapperClass(FofMapper.class);
            job.setReducerClass(FofReducer.class);
            job.setMapOutputKeyClass(Fof.class);
            job.setMapOutputValueClass(IntWritable.class);
             
            //KeyValueTextInputFormat 默认分隔符是制表符
            //可以通过 config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); 修改
            job.setInputFormatClass(KeyValueTextInputFormat.class);
             
            FileInputFormat.addInputPath(job, new Path("/friend/"));
            Path outputPath = new Path("/output/friend");
             
            if(fs.exists(outputPath)){
                fs.delete(outputPath, true) ;
            }
             
            FileOutputFormat.setOutputPath(job, outputPath);
             
            boolean f = job.waitForCompletion(true) ;
             
            if(f){
                System.out.println("run1 job is done");
                return true ;
            }
             
        }catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("run1 job is fail");
        return false ;
    }
     
    public static class UserMapper extends Mapper<Text, Text, User, User>
    {
        /**
         * job2,生成好友推荐列表的mapper
         * 一个用户对应一个包含关系度的推荐用户
         */
        protected void map(Text key, Text value, Mapper<Text, Text, User, User>.Context context)
                throws IOException, InterruptedException {
             
            String uname1 = key.toString() ;
            String[] valpar = value.toString().split("   ");
            String uname2 = valpar[0];
            int fofCounter = Integer.parseInt(valpar[1]) ;
             
            context.write(new User(uname1, fofCounter), new User(uname2, fofCounter));
            context.write(new User(uname2, fofCounter), new User(uname1, fofCounter)); //由于是双向推荐,这里写两次
        }
         
    }
     
    public static class UserReducer extends Reducer<User, User, Text, Text>
    {
        /**
         * job2,生成好友推荐列表的reducer
         * reducer对数据进行聚合,最终生成
         * 如花 林志玲:3,老王:2,郭美美:1
         * 由于要按照用户进行分组,需要自定义分组
         * 由于要按照推荐度排序,需要自定义排序
         */
        protected void reduce(User arg0, Iterable<User> arg1, Reducer<User, User, Text, Text>.Context arg2)
                throws IOException, InterruptedException {
             
            StringBuilder fofs = new StringBuilder();
            int i=0;
            for (User u : arg1) {
                if(i>0){
                    fofs.append(",") ;
                }
                fofs.append(u.getUsername()+":"+u.getFofCounter()) ;
                i++ ;
            }
            arg2.write(new Text(arg0.getUsername()), new Text(fofs.toString()));
             
        }
         
    }
     
    public static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>
    {
        /**
         * fof 关系的mapper,找出fof关系,每个fof关系记录1度
         * 比如:小明 小芳 小飞
         * 将组成
         * 小芳 小飞 1
         */
        protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context)
                throws IOException, InterruptedException {
             
            String[] fofs = value.toString().split(" ") ;
            for(int i=0; i<fofs.length; i++){
                context.write(new Fof(fofs[i], key.toString()), new IntWritable(0)); // 用来判断是否存在直接好友关系
                for(int j=i+1; j<fofs.length; j++){
                    context.write(new Fof(fofs[i], fofs[j]), new IntWritable(1));
                }
            }
             
        }
         
    }
     
    public static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>
    {
        /**
         * 对fof关系进行聚合,关系度进行累加
         * 最终形成
         * 小芳 小飞 3
         */
        protected void reduce(Fof arg0, Iterable<IntWritable> arg1, Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2)
                throws IOException, InterruptedException {
             
            boolean flag = true;
            int counter = 0 ;
            for(IntWritable i : arg1) {
                int _i = i.get() ;
                 
                if (_i==0){
                    flag = false ;
                    break;
                }
                 
                counter += _i ;
            }
             
            if(flag) {
                String[] fofs = arg0.toString().split("  ");
                 
                arg2.write(arg0, new IntWritable(counter));
                 
            }
        }
         
    }
}

附上测试数据:

(默认都是以制表符分割,既 \t)

小明  老王  如花  林志玲
老王  小明  凤姐
如花  小明  李刚  凤姐
林志玲 小明  李刚  凤姐  郭美美
李刚  如花  凤姐  林志玲
郭美美 凤姐  林志玲
凤姐  老王  如花  林志玲 郭美美

阅读:1674
搜索
  • Linux 高性能网络编程库 Libevent 简介和示例 2332
  • Mac系统编译PHP7【20190929更新】 2208
  • zksync 和 layer2 1899
  • Hadoop 高可用集群搭建 (Hadoop HA) 1891
  • Linux 常用命令 1879
  • 安徽黄山游 1855
  • Windows 安装Swoole 1815
  • 小白鼠问题 1785
  • Hadoop 高可用YARN 配置 1785
  • 使用 Java+Thrift 实现异步事件处理服务 1684
简介
不定期分享软件开发经验,生活经验