在了解到Hadoop的生态环境以及Hadoop单机模式和伪分布式模式安装配置之后,我们可以使用自己熟悉的语言来编写Hadoop MapReduce程序,进一步了解MapReduce编程模型。
本教程将使用Python语言为Hadoop编写一个简单的MapReduce程序:单词计数
尽管Hadoop框架是用Java编写的,但是为Hadoop编写的程序不必非要Java写,还可以使用其他语言开发,比如Python,Ruby,C++等
编写完成的MapReduce程序可以直接在你已经搭建好的伪分布式程序中调试运行。
MapReduce的Python代码
我们将使用Hadoop流API通过STDIN和STDOUT在Map和Reduce代码间传递数据。我们只需要使用Python的sys.stdin读取输入数据和打印输出到sys.stdout。这就是我们需要做的,因为Hadoop流会处理好其他的一切。
mapper.py
将下面的代码保存在文件 /home/hadoop/workspace/mapper.py
中。它将从STDIN读取数据,拆分为单词并输出一组映射单词和它们数量(中间值)的行到STDOUT。尽管这个Map脚本不会计算出单词出现次数的总和(中间值)。相反,它会立即输出<word> 1
元组的形式——即使某个特定的单词可能会在输入中出现多次。在我们的例子中,我们让后续的Reduce做最终的总和计数。当然,你可以按照你的想法在你自己的脚本中修改这段代码。
需要给mapper.py文件赋予可执行权限:
/home/hadoop/workspace/mapper.py
代码如下
reducer.py
将下面的代码保存在文件 /home/hadoop/workspace/reducer.py
中。它将从STDIN读取mapper.py的结果(因此mapper.py的输出格式和reducer.py预期的输入格式必须匹配),然后统计每个单词出现的次数,最后将结果输出到STDOUT中。
需要给reducer.py文件赋予可执行权限:
/home/hadoop/workspace/reducer.py
代码如下
代码测试
在MapReduce作业中正式使用mapper.py和reducer.py之前,最好先在本地测试mapper.py和reducer.py脚本。否则,作业可能成功完成了但没有得到作业结果数据或者得到了不是你想要的结果。
这里有一些想法,关于如何测试这个Map和Reduce脚本的功能。
使用cat data | map | sort | reduce
这样的顺序。具体测试如下:
其中/home/hadoop/workspace/file/input1.txt
示例输入文件的内容如下:
在Hadoop上运行Python代码
下载示例输入数据
对于这个示例,我们将使用的三个文本来自Gutenberg项目:
- The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
- The Notebooks of Leonardo Da Vinci
- Ulysses by James Joyce
下载对应链接下的Plain Text UTF-8
,三个文本对应的地址分别为:
- https://www.gutenberg.org/cache/epub/20417/pg20417.txt
- https://www.gutenberg.org/files/5000/5000-8.txt
- https://www.gutenberg.org/files/4300/4300-0.txt
下载每个文件为纯文本文件,以UTF-8编译并且将这些文件存储在一个临时目录中,如/tmp/gutenberg。
将本地示例数据拷贝到HDFS
首先在HDFS中创建一个子目录,然后拷贝文件过来(如果input已存在先删除再创建,以免影响测试结果)。
运行MapReduce作业
运行MapReduce作业,敲入如下命令:
查看output-first
目录确保程序执行正常:
将文件从HDFS中拷入到你本地文件系统中
一般情况下,Hadoop对每个reducer产生一个输出文件;在我们的示例中,然而它将只创建单个文件,因为输入的文件都很小。
如果你想要在运行的时候修改Hadoop参数,如增加Reduce任务的数量,你可以使用-D选项:
只能指定reduce的task数量不能指定map的task数量。
改进Mapper和Reducer代码
上面的Mapper和Reducer例子应该给你提供了一种思路,关于如何创建第一个MapReduce程序。重点是代码简洁和易于理解,特别是对于Python语言的初学者。在现实程序中,你可能想要通过Python的迭代器和生成器来优化你的代码。
一般来说,迭代器和生成器有一个优点:序列中的元素在你需要它的时候才会生成。计算资源昂贵或内存紧缺的时候很有用。
注意:下面的Map和Reduce脚本只有运行在Hadoop环境中才会正常工作,即在 MapReduce任务中作为Mapper和Reducer。这表示在本地运行的测试命令”cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py”不会正常工作,因为一些功能是由Hadoop来完成的。
准确地说,我们计算了一个单词出现的次数,例如(“foo”, 4),只有恰巧相同的单词(foo)相继出现多次。然而,在大多数情况下,我们让Hadoop在Map和Reduce过程时自动分组(key, value)对这样的形式,因为Hadoop在这方面比我们简单的Python脚本效率更高。
advanced_mapper.py
advanced_mapper.py是改进之后的mapper代码:
advanced_reducer.py
advanced_reducer.py是改进之后的reducer代码:
代码改进结束。
参考: