使用Spark求解大图的连通组件(百度面试题)
题目
在百度网站上有很多作弊者,现在百度已经使用某种技术将他们标记起来,比如:
cookieA和cookieB的标签都为1,他们是代表同一个用户,而cookieB可能又有一个标签2与cookieC相同,此时cookieB和cookieC也是同一个用户,这样其实cookieA,cookieB,cookieC就是一个用户而已,但是百度的数据量很大,现需要找出这批大量数据中的所有作弊者,请使用分布式的方法来求解此问题(可以使用Hadoop
,Spark
或者其他的任何一种平台)
当时面试官也说了这个其实就是找连通子图问题
采样+多源最短路径(失败)
我最初以为这个图里每个连图子图规模较大,但是个数却不是很多,所以一下子想到的就是:
- 先对大图采样
- 以采样顶点为起始顶点,利用
Spark
的Pregel
模块来跑多源最短路径 - 每个源跑完之后标记形成的图就是连通子图
但是面试官一直问我该采多少样,你这么多是不是非常不精确之类的,(我当初就在想,这个就是要靠实验来定啊。。。)实在无法给面试官满意的答案,我就转移了个话题,反问:
q:请问假如这个图有2亿条边,那么最终形成的连通子图会超过2kw吗?
w:他说应该会超过并且可能会多于2kw
突然意识到我最初想到的采样方法这么做还真不合适。。。
标签传播版PageRank法
又思索了片刻,想到了PageRank+标签传播法,该方法使用PageRank
的随机游走来求解,与之不同的这里传播的是标签,每次迭代都将自己的标签告诉邻居,邻居收到消息后更新自己的标签(进行union操作),直至迭代再消息合并之后自己的标签无变化时停止迭代。
列如cookieB的标签其实是(1,2)在迭代的时候会向cookieA和cookieC发送自己的标签,它们俩收到消息之后进行合并,最终其标签也是(1,2),此时就可以求出cookieA,cookieB,cookieC其实就是一个人了,最重要的时候PageRank
很容易使用分布式实现。
好,接下来从代码的角度来讲解此方法:
建立图
由于输入的肯定是文本数据,所以现在需要将输入的建立形成图1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34val dataSource=List("1 1","2 1","2 2","3 2",
"4 3","5 3","5 4","6 4","6 5","7 5",
"8 7")
val rdd=sc.parallelize(dataSource).map { x=>{
val data=x.split(" ")
(data(0).toLong,data(1).toInt)
}}.cache()
//提取顶点
val vertexRdd=rdd.groupBy(_._1).map(x=>{
val vlabel=Map[Int,Int]() //这里其实存储的就是一个int 只是用map的位图法来存
x._2.foreach(a=>add2Map(a._2,vlabel))
(x._1,vlabel)
})
//提取边
val edgeRdd=rdd.groupBy(_._2).flatMap(x=>{
val vertexList=x._2.toList.unzip._1
val ret=ListBuffer[Edge[Option[Int]]]()
for(i<- 0 until vertexList.size;
j<-i+1 until vertexList.size;
if j<vertexList.size)
{
ret.append(Edge(vertexList(i),vertexList(j),None))
}
ret
})
//构成图
val graph=Graph(vertexRdd,edgeRdd)
这个数据源这里为了方便直接写死了,并且为了顶点id的处理方法暂时使用数字来代替题目中的cookie,从数据源中肉眼可以看到最终的连通子图应该是{1,2,3}和{4,5,6,7}
调用pregel接口
1 | val newG=graph.pregel(Map[Int,Int](), 10000, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) |
Spark
里面的pregel
是基于BSP
进行实现的,其实就是各种Join
,Map
调用接口的时候还需要额外编写三个方法(更新、发送、合并)
更新方法
1 | /** |
其实就是将两个标签集合进行并集而已
发送方法
1 | /** |
这里的发送方法很重要,当一条边上两端顶点的标签集合相同时就停止发送消息,否则就分别想两端发送自己的标签
合并方法
1 | /** |
合并方法其实和更新方法很类似,其实就是做了两个标签集合的合并,以减少网络开销
运行结果
最终将代码打成jar包在Spark
上进行运行,其结果如下:
从结果图种可以发现在跑了pregel
之后相连子图的标签都成为一致了,还有最后打印的连通组件(就是子图)也与预期的一致。
备注:这里的标签使用map版本的位图来存储,减少存储量,增加算法的能力。
添加完成源码
下面是做实验的完整源码:scala.2.10.4
,spark.1.3+hadoop.2.6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131//这里使用位图的方法可以减少标签的存储空间
private val SHIFT=0x5 //位移
private val MASK=0x1F
def main(args:Array[String])
{
val conf=new SparkConf().setAppName("connect component")
val sc=new SparkContext(conf)
val dataSource=List("1 1","2 1","2 2","3 2",
"4 3","5 3","5 4","6 4","6 5","7 5",
"8 7")
val rdd=sc.parallelize(dataSource).map { x=>{
val data=x.split(" ")
(data(0).toLong,data(1).toInt)
}}.cache()
//提取顶点
val vertexRdd=rdd.groupBy(_._1).map(x=>{
val vlabel=Map[Int,Int]() //这里其实存储的就是一个int 只是用map的位图法来存
x._2.foreach(a=>add2Map(a._2,vlabel))
(x._1,vlabel)
})
//提取边
val edgeRdd=rdd.groupBy(_._2).flatMap(x=>{
val vertexList=x._2.toList.unzip._1
val ret=ListBuffer[Edge[Option[Int]]]()
for(i<- 0 until vertexList.size;
j<-i+1 until vertexList.size;
if j<vertexList.size)
{
ret.append(Edge(vertexList(i),vertexList(j),None))
}
ret
})
//构成图
val graph=Graph(vertexRdd,edgeRdd)
println("init graph")
graph.triplets.collect().foreach(println(_))
//进行pregel计算
val newG=graph.pregel(Map[Int,Int](), 10000, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
println("after pregel")
newG.triplets.collect().foreach(println(_))
println("connect component")
newG.vertices.groupBy(_._2.hashCode()).map(_._2.unzip._1).collect().foreach(println(_))
//这里暂时用hashCode分组吧-_- 实际情况下大数据量很容易存在hash冲突
}
/**
* 节点数据的更新 就是集合的union
*/
def vprog(vid:VertexId,vdata:Map[Int,Int],message:Map[Int,Int]):Map[Int,Int]={
val ret=vdata.clone()
for((k,v)<-message)
{
if(!ret.contains(k))
ret.put(k, v)
else
ret.put(k, ret(k)|message(k))
}
ret
}
/**
* 发送消息
*/
def sendMsg(e:EdgeTriplet[Map[Int, Int], Option[Int]])={
if(equalMap(e.srcAttr,e.dstAttr))
Iterator.empty//迭代停止
else{
//哎,EdgeDirection.Either好像根本没效果,只能在这里发送双向来模拟无向图
Iterator((e.dstId,e.srcAttr.clone()),
(e.srcId,e.dstAttr.clone()))//将自己发送给邻接顶点
}
}
/**
* 合并消息
*/
def mergeMsg(map1:Map[Int,Int],map2:Map[Int,Int]):Map[Int,Int]={
for((k,v)<-map2)
{
if(!map1.contains(k))
map1.put(k, v)
else
map1.put(k, map1(k)|map2(k))
}
map1
}
def add2Map(v:Int,map:Map[Int,Int])
{
val key=v>>SHIFT //取索引位置
val value=1<<(v&MASK) //取32的模
if(!map.contains(key))
map.put(key, value)
else
map.put(key, map(key)|value) //使用位图法进行存储
}
/**
* 比较两个map是否相等
*/
def equalMap(map1:Map[Int,Int],map2:Map[Int,Int]):Boolean=
{
var ret=true
if(map1.size==map2.size)
{
for((k,v)<-map1;if ret)
ret=map2.contains(k) && map1(k)==map2(k)
}else{
ret=false
}
ret
}
本作品采用[知识共享署名-非商业性使用-相同方式共享 2.5]中国大陆许可协议进行许可,我的博客欢迎复制共享,但在同时,希望保留我的署名权kubiCode,并且,不得用于商业用途。如您有任何疑问或者授权方面的协商,请给我留言。