创作人 Leo
编辑时间 Wed Jan 1,2020 at 09:28
好友推荐是社交网站的基础模块
当数据量大后好友推荐运算量会非常大
通过分布式计算可以轻松实现低延迟好友推荐系统
好友推荐的mapreduce需要两个job
job1 负责找出用户间的FOF关系(friend of friend)
job2 负责找出各用户间的关系度,并生成最终推荐列表
job1 只需要定义mapper和reducer即可
job2 要相对复杂一些,
由于一组关系数据最终要对应到一个用户上,所以需要自定义分组,
另外,推荐列表需要按照关系度排序,这就要自定义排序
定义Fof模型,可以方便管理Fof关系数据
Fof.java
package lx;
import org.apache.hadoop.io.Text;
public class Fof extends Text {
public Fof(){
super() ;
}
/**
* fof 关系类
* Text格式为 小明 小红 ,方便直接写入中间结果以便于后续解析
*/
public Fof(String a, String b){
super(getFof(a,b));
}
public static String getFof(String a, String b){
int r = a.compareTo(b) ;
if(r>0){
return a+" "+b ;
}else{
return b+" "+a ;
}
}
}
定义一个用户模型,自定义排序和自定义分组都需要根据键来处理,利用这个User模型就可以实现
另外这个用户模型也可以保存推荐用户的一些信息,比如关系度
User.java
package lx;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class User implements WritableComparable<User>{
private String username;
private int fofCounter ;
public User(){
//不定义这个空参数的构造函数会报错
}
/**
* 一个代表用户的模型类
*
* @param _username 用户名字(分组用)
* @param _fofCounter 用户的关系度(排序用)
*/
public User(String _username, int _fofCounter){
this.username = _username ;
this.fofCounter = _fofCounter ;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getFofCounter() {
return fofCounter;
}
public void setFofCounter(int fofCounter) {
this.fofCounter = fofCounter;
}
//实现接口:反序列化必备
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
this.username = new String(arg0.readUTF());
this.fofCounter = arg0.readInt();
}
//实现接口:序列化必备
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
arg0.writeUTF(this.username);
arg0.writeInt(this.fofCounter);
}
//实现接口:判断两个对象是不是同一个对象
public int compareTo(User o) {
// TODO Auto-generated method stub
int r = this.username.compareTo(o.getUsername()) ;
if(r==0){
return Integer.compare(this.fofCounter,o.getFofCounter());
}
return r;
}
}
自定义分组类
UserGroup.java
package lx;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class UserGroup extends WritableComparator {
/**
* 复写父类函数
* 通过用户名字判断是否为一组数据
*/
public int compare(WritableComparable a, WritableComparable b) {
//第二次mapred 需要按照key User的name分组
User k1 = (User) a;
User k2 = (User) b;
int r1 = k1.getUsername().compareTo(k2.getUsername());
return r1 ;
}
public UserGroup (){
//向上层注册需要比对的类
super(User.class, true) ;
}
}
自定义排序类
UserSort.java
package lx;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class UserSort extends WritableComparator {
/**
* 复写父类函数
* 通过关系度排序
* (名字也不能忽略,如果名字排序不进行处理,自定义分组会失效)
*/
public int compare(WritableComparable a, WritableComparable b) {
User k1 = (User) a;
User k2 = (User) b;
int r = k1.getUsername().compareTo(k2.getUsername());
if (r==0){
return -Integer.compare(k1.getFofCounter() , k2.getFofCounter()) ;
}
return r ;
}
public UserSort(){
super(User.class, true) ;
}
}
程序主体
包含了两个job的执行和Mapper、Reducer
RunJob.java
package lx;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args){
Configuration config = new Configuration() ;
//这个程序需要执行两个子任务
//任务1负责找出所有存在的FOF(friend of friend)关系
//任务2负责对fof关系进行排序,并生成最终结果,一个用户对应一个按照关系度倒序的推荐列表
if(RunJob.run1(config)){
RunJob.run2(config);
}
}
public static void run2(Configuration config){
try{
FileSystem fs = FileSystem.get(config) ;
Job job = Job.getInstance();
job.setJarByClass(RunJob.class);
job.setJobName("fofuser");
//设置MapReduce相关类和键值类型
job.setMapperClass(UserMapper.class);
job.setReducerClass(UserReducer.class);
job.setGroupingComparatorClass(UserGroup.class);
job.setSortComparatorClass(UserSort.class);
job.setMapOutputKeyClass(User.class);
job.setMapOutputValueClass(User.class);
//KeyValueTextInputFormat 默认分隔符是制表符
//可以通过 config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); 修改
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/output/friend/"));
Path outputPath = new Path("/output/friend2");
if(fs.exists(outputPath)){
fs.delete(outputPath, true) ;
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean f = job.waitForCompletion(true) ;
if(f){
System.out.println("run2 job is done");
}
}catch(Exception e){
e.printStackTrace();
}
}
public static boolean run1(Configuration config){
try{
FileSystem fs = FileSystem.get(config) ;
Job job = Job.getInstance();
job.setJarByClass(RunJob.class);
job.setJobName("fof");
//设置MapReduce相关类和键值类型
job.setMapperClass(FofMapper.class);
job.setReducerClass(FofReducer.class);
job.setMapOutputKeyClass(Fof.class);
job.setMapOutputValueClass(IntWritable.class);
//KeyValueTextInputFormat 默认分隔符是制表符
//可以通过 config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); 修改
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/friend/"));
Path outputPath = new Path("/output/friend");
if(fs.exists(outputPath)){
fs.delete(outputPath, true) ;
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean f = job.waitForCompletion(true) ;
if(f){
System.out.println("run1 job is done");
return true ;
}
}catch(Exception e){
e.printStackTrace();
}
System.out.println("run1 job is fail");
return false ;
}
public static class UserMapper extends Mapper<Text, Text, User, User>
{
/**
* job2,生成好友推荐列表的mapper
* 一个用户对应一个包含关系度的推荐用户
*/
protected void map(Text key, Text value, Mapper<Text, Text, User, User>.Context context)
throws IOException, InterruptedException {
String uname1 = key.toString() ;
String[] valpar = value.toString().split(" ");
String uname2 = valpar[0];
int fofCounter = Integer.parseInt(valpar[1]) ;
context.write(new User(uname1, fofCounter), new User(uname2, fofCounter));
context.write(new User(uname2, fofCounter), new User(uname1, fofCounter)); //由于是双向推荐,这里写两次
}
}
public static class UserReducer extends Reducer<User, User, Text, Text>
{
/**
* job2,生成好友推荐列表的reducer
* reducer对数据进行聚合,最终生成
* 如花 林志玲:3,老王:2,郭美美:1
* 由于要按照用户进行分组,需要自定义分组
* 由于要按照推荐度排序,需要自定义排序
*/
protected void reduce(User arg0, Iterable<User> arg1, Reducer<User, User, Text, Text>.Context arg2)
throws IOException, InterruptedException {
StringBuilder fofs = new StringBuilder();
int i=0;
for (User u : arg1) {
if(i>0){
fofs.append(",") ;
}
fofs.append(u.getUsername()+":"+u.getFofCounter()) ;
i++ ;
}
arg2.write(new Text(arg0.getUsername()), new Text(fofs.toString()));
}
}
public static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>
{
/**
* fof 关系的mapper,找出fof关系,每个fof关系记录1度
* 比如:小明 小芳 小飞
* 将组成
* 小芳 小飞 1
*/
protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] fofs = value.toString().split(" ") ;
for(int i=0; i<fofs.length; i++){
context.write(new Fof(fofs[i], key.toString()), new IntWritable(0)); // 用来判断是否存在直接好友关系
for(int j=i+1; j<fofs.length; j++){
context.write(new Fof(fofs[i], fofs[j]), new IntWritable(1));
}
}
}
}
public static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>
{
/**
* 对fof关系进行聚合,关系度进行累加
* 最终形成
* 小芳 小飞 3
*/
protected void reduce(Fof arg0, Iterable<IntWritable> arg1, Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2)
throws IOException, InterruptedException {
boolean flag = true;
int counter = 0 ;
for(IntWritable i : arg1) {
int _i = i.get() ;
if (_i==0){
flag = false ;
break;
}
counter += _i ;
}
if(flag) {
String[] fofs = arg0.toString().split(" ");
arg2.write(arg0, new IntWritable(counter));
}
}
}
}
附上测试数据:
(默认都是以制表符分割,既 \t)
小明 老王 如花 林志玲
老王 小明 凤姐
如花 小明 李刚 凤姐
林志玲 小明 李刚 凤姐 郭美美
李刚 如花 凤姐 林志玲
郭美美 凤姐 林志玲
凤姐 老王 如花 林志玲 郭美美