Hadoop Streaming 实践以及Debug
Hadoop Streaming是一个便于变成Map Reduce程序的工具包,这个工具包可以支持各种可执行/脚本语言来创建Mapper和Reducer,利用Hadoop的优势进行大数据的处理,这些语言仅仅只需要支持*unix的表示输出输入即可(python,c,c++,perl,akw etc.)
Streaming实践
先直接来看一个由python
写的Streaming
程序,还有那个经典的word count,我们的数据集是一篇英语作文,
看来看他的mapper
文件1
2
3
4
5
6
7
8
9
10
11#!/usr/bin/env python
#-*- coding=utf8 -*-
import sys,re
re_english = re.compile(u'[^a-zA-Z0-9\-]+')
for line in sys.stdin: #这里你可以看做是map类中的line输入
words = re_english.sub(' ',line.strip()) #这里只提取英文数字
for word in words.split():
print '%s\t%s' % (word, 1) #这儿就是标准的输出,用tab隔开 默认第一个值为key
其实看上面的mapper
文件还是挺带感的,和标准的mapper
类很类似,这里就不解释了,相信用java
写过标准Map-Reduce
都应该很熟悉
现在再来看reducer
文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23!/usr/bin/env python
#-*- coding=utf8 -*-
import sys
lastk = None #这里标志最后一个k 用于控制同一个key 到一个组中
count = 0
for line in sys.stdin:
w,c = line.split('\t')
c = int(c) #不转成int会比较麻烦 这是是计数
if lastk == None: #这里是判断是否过来的是第一个key
lastk=w
count += c
elif lastk == w:
count += c
else:
print "%s\t%s"%(lastk,count)
lastk=w
count = c #这里重置计数
if lastk is not None:
print "%s\t%s"%(lastk,count)
陌生感来了把~其实这里是这样的:
都说了是Streaming,他其实是流式进来的,在进来之前还是和标准的mr一样按key进行partation划分到各个桶中,然后每个桶会有若干个key,这里按key分组一次会将记录一条一条的使用*unix的标准输入 读入道sys.stdin中,那么问题了来了,原来mr中的 迭代器的值如何构造?这里主要使用lastk来的变量,每次当输入的key与lastk相等的时候,将当前的值加入到字典或者数组中(因为这个demo是wordcount,所以用累加计数来代替了,第16行),直到key与lastk不等时(第18行),此时的数组或者字典就是原来 值的迭代器里面的东西,和正常的mr一样操作,该输出的输出,完了之后同时得更新lastk以迎接下一组key的到来,同时清空数组或者字典,周而复始,直至全部输入之后,判断我的lastk是否存在值,有的话这个lastk作为最后一组key进行输出(第23行),这样的方式就可以构造出原来的(key,iter[value])模式了
上述的构造看上去些代码可能更加麻烦一点,但是其实这样的方式是应该灵活了
现在mapper
和reducer
两个文件写完了,该如何执行呢?
1 | hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \ |
其实提交的方式很类似原生的jar
包提交,只是这里的jar
是使用了Hadoop
自带的streaming
包,敲火车键进行执行
15/11/07 20:46:47 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
15/11/07 20:46:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/root/program/hadoop-2.7.0/runjar/pyscript/word_count_mapper.py, /root/program/hadoop-2.7.0/runjar/pyscript/word_count_reducer.py, /tmp/hadoop-unjar1825196483906999229/] [] /tmp/streamjob6480432411236657839.jar tmpDir=null
15/11/07 20:46:51 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.56.2:8032
15/11/07 20:46:51 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.56.2:8032
15/11/07 20:46:53 INFO mapred.FileInputFormat: Total input paths to process : 1
15/11/07 20:46:54 INFO mapreduce.JobSubmitter: number of splits:2
15/11/07 20:46:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446946409340_0001
15/11/07 20:46:55 INFO impl.YarnClientImpl: Submitted application application_1446946409340_0001
15/11/07 20:46:55 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1446946409340_0001/
15/11/07 20:46:55 INFO mapreduce.Job: Running job: job_1446946409340_0001
15/11/07 20:47:12 INFO mapreduce.Job: Job job_1446946409340_0001 running in uber mode : false
15/11/07 20:47:12 INFO mapreduce.Job: map 0% reduce 0%
15/11/07 20:47:33 INFO mapreduce.Job: map 100% reduce 0%
15/11/07 20:47:46 INFO mapreduce.Job: map 100% reduce 100%
15/11/07 20:47:47 INFO mapreduce.Job: Job job_1446946409340_0001 completed successfully
15/11/07 20:47:47 INFO mapreduce.Job: Counters: 49
File System Counters
----此处和mr一样 省略1w字
File Input Format Counters
Bytes Read=2223
File Output Format Counters
Bytes Written=1208
15/11/07 20:47:47 INFO streaming.StreamJob: Output directory: /yyl/test/ouput/streaming
然后来查看熟悉的word count结果
[root@master pyscript]# hadoop fs -get /yyl/test/ouput/streaming
15/11/07 20:48:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/07 20:48:19 WARN hdfs.DFSClient: DFSInputStream has been closed already
15/11/07 20:48:19 WARN hdfs.DFSClient: DFSInputStream has been closed already
[root@master pyscript]# cd streaming/
[root@master streaming]# sort -nr -k 2 part-00000 | head -n 10
and 11
to 10
people 10
Micro-blog 8
their 6
a 5
other 4
of 4
more 4
has 4
取计数最高的10个,看到了熟悉的字样,好了,这样一次streaming
写执行完了,是不是甚是方便
这里有一个提示,可以再通
hadoop版本hadoop-streaming-*.jar
的位置不一样,你可以使用find命令进行查找具体的位置
到了这里已经可以基本执行streaming
程序了,但是从上面的跑的命令里可以看到有好多配置的样子,还有另外可以发现,在写的mapper
和reducer
中只写了数据的处理逻辑,其他的一些配置参数根本无法写入,那么这些东西都是得在执行的命令里面进行配置的,他可以有的配置参数大致有如下几个
参数名称 | 可选/必选 | 描述 |
---|---|---|
-input |
必选 | 输入文件/目录的位置 |
-output |
必选 | 输出目录 |
-mapper |
必选 | mapper 的执行文件或者JavaClassName |
-reducer |
必选 | reducer 的执行或者JavaClassName |
-file |
必选 | 执行的mapper或者reducer文件以及其依赖文件,一定要写,多个可以写多行,他会共享到各个节点上,也可以是jar包 |
-inputformat |
可选 | 填JavaClassName ,为自定义的输入格式,默认是TextInputFormat |
-outputformat |
可选 | 填JavaClassName ,为自定义的输出格式,默认是TextOutputformat |
-partitioner |
可选 | 填JavaClassName ,为自定义的分区函数 |
-combiner |
可选 | mapper 输出之后的合并类,是JavaClassName |
-cmdenv |
可选 | name=value 为输入到流命令里面的环境变量 |
-inputreader |
可选 | 貌似可以代替-inputformat 这个东西 |
-verbose |
可选 | 启用java 的verbose 输出 |
-lazyOutput |
可选 | 当输出格式为FileOutputFormat 时,可以配置为懒输出-_- |
-numReduceTasks |
可选 | 指定的reducer 的数目 |
-mapdebug |
可选 | 指定一个脚本当mapper 失败的时候进行调用 |
-reducedebug |
可选 | 指定一个脚本当reducer 失败的时候进行调用 |
-conf |
可选 | 指定配置文件 |
-D |
可选 | property=value 可以配置Hadoop 原生的配置项 实用^_^ |
这么看来,
streaming
还是很强大以及很灵活的
Streaming调试
从上述的配置中可以看到 可以配置mapdebug
和reducedebug
来追踪streaming
中的错误信息来进行调试,除了这种方式,streaming
调试还有一种更加方便的方式,
先来看streaming
的执行过程mapper->shuffle->reducer
,数据以流的方式进行传递的,在Linux
中可以配合自带的命令以及官道来完成这一过程,现在可以看模拟刚刚的demo执行1
[root@master pyscript]# cat ~/data/words.txt | python word_count_mapper.py | sort | python word_count_reducer.py | sort -nr -k 2 |head -n 10
可以看到其输出
and 11
to 10
people 10
Micro-blog 8
their 6
a 5
other 4
of 4
more 4
has 4
与上述demo中的结果一模一样,现在大致来分解一下上述命令
cat ~/data/words.txt
:输出文件内容| python word_count_mapper.py
:管道命令 将上一步输出的内容输出到要执行的mapper
中| sort
:管道命令 直接将mapper
中输出的内容按第一列进行排序,排序完了之后其实就是达到了分组的效应| python word_count_reducer.py
:管道 将排序后的值一次输入到reducer
中进行执行,其实到了这一步已经完成了streaming
的模拟| sort -nr -k 2 |head -n 10
将最后的结果排个序,再取top
使用上述方式来进行调试我感觉有两大优势
- 快,不需要提交到服务器上 慢悠悠的取执行
- 准,可以直接看到
python
抛出来的错误
Streaming常见错误
- Caused by: java.io.IOException: error=2, No such file or directory
这个往往是由于没有指定mapper,redue=cer
的-file
引起的,也可以使用通配符*
- 另一未知的错误 估计就是写的脚本执行出了问题,使用上述方式先在本地调试完了再跑
总结
Hadoop streaming
写起来很灵活,并且由于跨语言,迁移起来很很快,熟悉不同语言的开发人员也非常容易合作,如果Hadoop streaming
程序由多个Map-Reduce
构成,那么用Shell
来组织整个程序也就会非常的方便快捷
参考
本作品采用[知识共享署名-非商业性使用-相同方式共享 2.5]中国大陆许可协议进行许可,我的博客欢迎复制共享,但在同时,希望保留我的署名权