Reducer端内存不足的情况下如何进行Hadoop的Join(百度面试题)
题目
Instance的格式是:
Lineid show clk fea1:slot1 fea2:slot2 …
Feature的格式是:
Fea1\tweight
Fea2\tweight
现在要求计算每个instance的feature权重之和,但是feature数量极多,可能几千万或者几亿,总之无法放入内存。
分析
- 这是一道很典型的
Join
操作题目,但是由于是百度的面试题,肯定不会那么简单 - 假如将Instance作为大表,Feature作为小表,但是Feature已经无法放入内存了,就无法用
map-silde
来做Join
了 - 所以只能用
reduce-silde
来做,但是这种方式在Reducer
端输入的迭代器中会混杂着Feature的权重和LineId,并且是无序的,所以一般会遍历迭代器,然后将两者分离,但是问题就出在这里了,分离的时候要将LineId保存到一个容器里,这时候极有可能出现OOM,也就是在Reducer
端内存不足
方法1:将key冗余输出
上述分析不是应该要将迭代器重新拷贝到一个容器中才产生的OOM的嘛,那如果这个迭代器里面的数据量不多,拷贝一份也不会出现OOM的异常的话问题就解决了嘛。
具体做法就是:
在Mapper端输出的时如果输出feature的话 就以N份冗余的副本输出,比如fea1_1,fea1_2,...fea1_N.然后如果输出instance的时候在其key上随机加一个1~N的随机数后缀,以保证它在输出之后在Reducer端能与一个正常的feature进去一个迭代器中
用图解就是:
图(1)中红色区块在shuffle完成之后变得巨大,但是在图2种由于将key冗余输出,也就是将红色区块分而治之了
请允许我帖上全部代码。。。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170/**
* 百度的面试题 Join的实践 但是任意两个表都无法完全放入内存
* 这里是用key冗余输出
* @author root
*
*/
public class BaiduJoin {
/**
* Join的map方法
*
* @author root
*
*/
public static class JoinMapper extends Mapper<Object, Text, Text, Text> {
private static final Text outputKey = new Text();
/**
* 输出副本 可以降低reducer端oom的概率
*/
private static final int DUPLICATE=10;
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
InputSplit s = context.getInputSplit();// 这里是可以得到分片的name啊
if (s.toString().indexOf("instance") > -1) {
// 表示instance的名称
String[] data = value.toString().split("\\s+");
Text lineId = new Text("l:" + data[0]);
for (int i = 0; i < data.length; i++) {
if (data[i].indexOf(":") >= 0) {
outputKey.set(new Text(data[i].split(":")[0]
+(int)(Math.random()*DUPLICATE)));//用随机副本进行稀疏
System.out.println(outputKey.toString());
context.write(outputKey, lineId);// 输出feature,lineid
}
}
} else {
// 表示feature
String[] data = value.toString().split("\\t");
for(int i=0;i<DUPLICATE;i++)
{
outputKey.set(data[0]+i);
context.write(outputKey, new Text("w:" + data[1]));// 输出feature,weight
}
}
}
}
public static class JoinReducer extends
Reducer<Text, Text, IntWritable, DoubleWritable> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
LinkedList<String> lindList=new LinkedList<String>();//最容易出现OOM的地方
DoubleWritable weight=null;
for(Text text:values)
{
String str=text.toString();
if(str.indexOf("l:")>-1){
//表示Linid
lindList.add(str.split("l:")[1]);
}else{
weight=new DoubleWritable(Double.parseDouble(str.split("w:")[1]));//得到对应feautre的权重
}
}
for(String lineId:lindList)
{
context.write(new IntWritable(Integer.parseInt(lineId)),weight);
}
}
}
/**
* 按行輸出权重的map
* @author root
*
*/
public static class LineOutMapper extends
Mapper<Object, Text, IntWritable, DoubleWritable> {
private final static IntWritable lineId = new IntWritable(1);
private final static DoubleWritable weight=new DoubleWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] data=value.toString().split("\\t");
lineId.set(Integer.parseInt(data[0]));
weight.set(Double.parseDouble(data[1]));
context.write(lineId, weight);
}
}
/**
* 权重求和的reducer
* @author root
*
*/
public static class WeightSumReducer extends
Reducer<IntWritable, DoubleWritable,IntWritable, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(IntWritable key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
if(args.length<2)
{
System.out.println("hello,the arguments length can't less 2");
System.exit(-1);
}
/**
* 临时的一个目录
*/
final String TEMP_PATH="/output/temp/"+System.currentTimeMillis();
Configuration conf = new Configuration();
FileSystem fs=FileSystem.get(conf);
if(fs.exists(new Path(args[1])))
fs.delete(new Path(args[1]));
System.out.println("start baidu join");
//启动join 的一个job
Job job = Job.getInstance(conf, "baidu join");
job.setJarByClass(BaiduJoin.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(2);
for(String path:args[0].split(";|,"))
FileInputFormat.addInputPath(job, new Path(path));//多目录的输入 用空格隔开
FileOutputFormat.setOutputPath(job, new Path(TEMP_PATH));
job.waitForCompletion(true);
System.out.println("start baidu join sum");
//启动权重求和的job
Job jobSum = Job.getInstance(conf, "baidu join sum");
jobSum.setJarByClass(BaiduJoin.class);
jobSum.setMapperClass(LineOutMapper.class);
jobSum.setReducerClass(WeightSumReducer.class);
jobSum.setOutputKeyClass(IntWritable.class);
jobSum.setOutputValueClass(DoubleWritable.class);
jobSum.setNumReduceTasks(2);
FileInputFormat.addInputPath(jobSum, new Path(TEMP_PATH));
FileOutputFormat.setOutputPath(jobSum, new Path(args[1]));
jobSum.waitForCompletion(true);
if(fs.exists(new Path(TEMP_PATH)))
fs.delete(new Path(TEMP_PATH));//在这里删除临时目录
System.out.println("ok");
}
}
方法2:二次排序法
我们知道OOM最大的挑战就是迭代器的拷贝,其实这个迭代器重就一个feature的权重以及海量的lineId,那如果能将这个weight放入迭代器中的首位,其不就可以避免拷贝了嘛。二次排序就可以完成此需求
其实Hadoop
在Mapper
到Reducer
端的时候本身就经历二次排序,第一次是在Mapper
之后,第二次是在Reducer
之前,但是他们都是对key
进行排序,那如何影响到value
呢,其实就是将value
放进key
中?
详情请看:mapreduce的二次排序 SecondarySort
按照上面的资料,我们先要建立自己的key
的数据类型类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84**
* 自定义的一个特征类型
* @author root
*
*/
public class FeatureWritable implements WritableComparable<FeatureWritable> {
/**
* 特征名称
*/
String featureName="";
/**
* 第二排序名称
*/
String secondName="";
public void set(String featureName,String secondName)
{
this.featureName=featureName;
this.secondName=secondName;
}
public String getFeatureName() {
return featureName;
}
public String getSecondName() {
return secondName;
}
@Override
public void readFields(DataInput in) throws IOException {
String line=in.readLine();
String[] data=line.split("\\t");
this.featureName=data[0];
this.secondName=data[1];
}
@Override
public void write(DataOutput out) throws IOException {
String line=String.format("%s\t%s", this.featureName,this.secondName);
out.write(line.getBytes());
}
/**
* 重写比较方法
* @param o
* @return
*/
@Override
public int compareTo(FeatureWritable o) {
int ret=-1;
if(this.featureName.compareTo(o.featureName)==0)
{
ret=-1*this.secondName.compareTo(o.secondName);//这里是排序的重点 但是String比较代价大啊
}else{
ret=this.featureName.compareTo(o.featureName);
}
return ret;
}
@Override
public int hashCode(){
return this.featureName.hashCode()&this.secondName.hashCode();
}
public boolean equals(Object o)
{
if(o == null || !(o instanceof FeatureWritable)){
return false;
}
FeatureWritable fw=(FeatureWritable)o;
return fw.featureName.equals(this.featureName)
&&fw.secondName.equals(this.secondName);
}
public String toString()
{
return String.format("%s-%s",this.featureName,this.secondName);
}
}
再得自己实现一个分区函数1
2
3
4
5
6
7
8
9
10
11
12public class FeaturePartition extends Partitioner<FeatureWritable,Text> {
/**
* 重写分区的一个方法 分区只和特征名称有关 与具体的值无关
*/
@Override
public int getPartition(FeatureWritable fw, Text value, int numPartitions) {
return fw.getFeatureName().hashCode()&0xFF*127%numPartitions;
}
}
因为需要保证同一个feauteName的落到同一个桶中,不然都白搭了。
还需要自己实现一个分组类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* 重写分组的一个比较器
* @author root
*
*/
public class GroupingComparator extends WritableComparator {
protected GroupingComparator()
{
super(FeatureWritable.class, true);
}
/**
* 特征名称相同的分到一组中
*/
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2)
{
FeatureWritable fw1 = (FeatureWritable) w1;
FeatureWritable fw2 = (FeatureWritable) w2;
//System.out.println(String.format("~~~~%s,%s,%s", fw1,fw2,fw1.getFeatureName().compareTo(fw2.getFeatureName())));
return fw1.getFeatureName().compareTo(fw2.getFeatureName());
}
}
这样才能让同一个featureName落入同一个迭代器中。
最后还要显示的设置你自定义的partition函数以及group函数1
2
3
4
5/**
* 在这里设计自定义的partition 和 分组
*/
job.setPartitionerClass(FeaturePartition.class);
job.setGroupingComparatorClass(GroupingComparator.class);
初步工作就已经完成,现在按Join得大体框架就可以写代码了:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167/**
* 使用二次排序来解决join问题
* @author root
*
*/
public class BaiduJoinSecondarySort {
/**
* Join的map方法
*
* @author root
*
*/
public static class JoinMapper extends Mapper<Object, Text, FeatureWritable, Text> {
private static final FeatureWritable outputKey = new FeatureWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
InputSplit s = context.getInputSplit();// 这里是可以得到分片的name啊
if (s.toString().indexOf("instance") > -1) {
// 表示instance的名称
String[] data = value.toString().split("\\s+");
Text lineId = new Text("l:" + data[0]);
for (int i = 0; i < data.length; i++) {
if (data[i].indexOf(":") >= 0) {
outputKey.set(data[i].split(":")[0], lineId.toString());
context.write(outputKey, lineId);// 输出feature,lineid
}
}
} else {
// 表示feature
String[] data = value.toString().split("\\t");
outputKey.set(data[0],"w:" + data[1]);
context.write(outputKey, new Text("w:" + data[1]));// 输出feature,weight
}
}
}
public static class JoinReducer extends
Reducer<FeatureWritable, Text, IntWritable, DoubleWritable> {
private static final IntWritable lineId=new IntWritable();
public void reduce(FeatureWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
System.out.println("--------------------------------");
DoubleWritable weight=null;
for(Text text:values)
{
System.out.println(text);
String str=text.toString();
if(weight==null && str.indexOf("w")>-1)
{
weight=new DoubleWritable(Double.parseDouble(str.split("w:")[1]));//得到对应feautre的权重
}else{
lineId.set(Integer.parseInt(str.split("l:")[1]));
context.write(lineId,weight);
}
}
}
}
/**
* 按行輸出权重的map
* @author root
*
*/
public static class LineOutMapper extends
Mapper<Object, Text, IntWritable, DoubleWritable> {
private final static IntWritable lineId = new IntWritable(1);
private final static DoubleWritable weight=new DoubleWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] data=value.toString().split("\\t");
lineId.set(Integer.parseInt(data[0]));
weight.set(Double.parseDouble(data[1]));
context.write(lineId, weight);
}
}
/**
* 权重求和的reducer
* @author root
*
*/
public static class WeightSumReducer extends
Reducer<IntWritable, DoubleWritable,IntWritable, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(IntWritable key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
if(args.length<2)
{
System.out.println("hello,the arguments length can't less 2");
System.exit(-1);
}
/**
* 临时的一个目录
*/
final String TEMP_PATH="/output/temp/"+System.currentTimeMillis();
Configuration conf = new Configuration();
FileSystem fs=FileSystem.get(conf);
if(fs.exists(new Path(args[1])))
fs.delete(new Path(args[1]));
System.out.println("start baidu join");
//启动join 的一个job
Job job = Job.getInstance(conf, "baidu join");
/**
* 在这里设计自定义的partition 和 分组
*/
job.setPartitionerClass(FeaturePartition.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setJarByClass(BaiduJoinSecondarySort.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(FeatureWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(2);
for(String path:args[0].split(";|,"))
FileInputFormat.addInputPath(job, new Path(path));//多目录的输入 用空格隔开
FileOutputFormat.setOutputPath(job, new Path(TEMP_PATH));
job.waitForCompletion(true);
System.out.println("start baidu join sum");
//启动权重求和的job
Job jobSum = Job.getInstance(conf, "baidu join sum");
jobSum.setJarByClass(BaiduJoinSecondarySort.class);
jobSum.setMapperClass(LineOutMapper.class);
jobSum.setReducerClass(WeightSumReducer.class);
jobSum.setOutputKeyClass(IntWritable.class);
jobSum.setOutputValueClass(DoubleWritable.class);
jobSum.setNumReduceTasks(2);
FileInputFormat.addInputPath(jobSum, new Path(TEMP_PATH));
FileOutputFormat.setOutputPath(jobSum, new Path(args[1]));
jobSum.waitForCompletion(true);
if(fs.exists(new Path(TEMP_PATH)))
fs.delete(new Path(TEMP_PATH));//在这里删除临时目录
System.out.println("ok");
}
}
从上面的JoinReducer
类中可以知道二次排序的方法并没有使用拷贝,然后再看我自己的输出:
权重值总是在第一行。
总结
- 上述的key冗余法虽然看上去简单粗暴,但是它效果好啊。。。
- 关于二次排序虽然在
reducer
端不需要排序,但是它将压力转移给了那两次排序,那么多字符串的比较代价也还是好大的。。感觉慎用。
本作品采用[知识共享署名-非商业性使用-相同方式共享 2.5]中国大陆许可协议进行许可,我的博客欢迎复制共享,但在同时,希望保留我的署名权kubiCode,并且,不得用于商业用途。如您有任何疑问或者授权方面的协商,请给我留言。