文章目录
  1. 1. 原因
  2. 2. 所以
  3. 3. 实现

原因

昨天将此题使用标签传播版PageRank法(标签合并)得到了正确地结果,但是今天想想这个效率也还是蛮低啊:

  1. 虽然关于标签的存储使用了位图法,但是海量数据下这个标签的存储仍然是一个大得问题
  2. 迭代过程中多次对两两标签集合进行合并,还不时的需要clone操作,时间空间开销都是很大
  3. 关于迭代停止是判断两个标签集合里面的内容是否相等,需要O(n)的复杂度
  4. 最后在判断连通子图时,进行Joinkey计算时使用了hashCode,这种方法偷懒的方法很容易造成结果不准确,但是也的确很难用一个唯一的数字来表示一个HashMap了呀

所以

今天想到了这些标签集合完全可以使用集合中标签的最小值的代替,比如(接上文的cookie)cookeiA的最小值是1,cookeB的最小值是1,cookeC的最小值是3,在迭代时cookieB将消息发给cookieC,cookeC将自己的标签由3更新到了1,这样就很判断cookieA,B,C就是一个人了,而且只需要一个int类型就可以了,不需要额外的开销。

实现

今晚修改了一下源码,上面的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
ddef main(args:Array[String])
{
val conf=new SparkConf().setAppName("connect componentV2")
val sc=new SparkContext(conf)

val dataSource=List("1 1","2 1","2 2","3 2",
"4 3","5 3","5 4","6 4","6 5","7 5",
"8 7","9 7","9 8","9 9","10 8","11 9")

val rdd=sc.parallelize(dataSource).map { x=>{
val data=x.split(" ")
(data(0).toLong,data(1).toInt)
}}.cache()

//提取顶点
val vertexRdd=rdd.groupBy(_._1).map(x=>{(x._1,x._2.unzip._2.min)})

//提取边
val edgeRdd=rdd.groupBy(_._2).flatMap(x=>{
val vertexList=x._2.toList.unzip._1
val ret=ListBuffer[Edge[Option[Int]]]()

for(i<- 0 until vertexList.size;
j<-i+1 until vertexList.size;
if j<vertexList.size)
{
ret.append(Edge(vertexList(i),vertexList(j),None))
}

ret
})

//构成图
val graph=Graph(vertexRdd,edgeRdd)
println("init graph")
graph.triplets.collect().foreach(println(_))

//进行pregel计算
val newG=graph.pregel(Int.MaxValue, 10000, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
println("after pregel")
newG.triplets.collect().foreach(println(_))

println("connect component")
newG.vertices.groupBy(_._2).map(_._2.unzip._1).collect().foreach(println(_))
}

/**
* 节点数据的更新 就是取最小值
*/

def vprog(vid:VertexId,vdata:Int,message:Int):Int=Math.min(vdata,message)

/**
* 发送消息
*/

def sendMsg(e:EdgeTriplet[Int, Option[Int]])={
if(e.srcAttr==e.dstAttr)
Iterator.empty//迭代停止
else{
//哎,EdgeDirection.Either好像根本没效果,只能在这里发送双向来模拟无向图
Iterator((e.dstId,e.srcAttr),
(e.srcId,e.dstAttr))//将自己发送给邻接顶点
}
}

/**
* 合并消息
*/

def mergeMsg(a:Int,b:Int):Int=Math.min(a, b)

光光从代码量上就可以看到精简了很多,也更加好理解了,接下来看下结果

文章目录
  1. 1. 原因
  2. 2. 所以
  3. 3. 实现