数据流
虽然 Hadoop 是一个基于 Java 的框架,但是其有可能在 Java 语言以外的语言中编写 msp 和 reduce 应用程序。Hadoop 内的 流 实用工具实现了一种数据流胶的类型。通过 流 实用工具,您可以定义您自己的可执行 map 和 reduce(使用每一个从标准输入 [stdin] 提取的输入和每一个通过标准输出 [stdout] 提供的输出),且 流 实用工具可适当地读取和写入数据,根据需要调用您的应用程序(请参考清单 3)。
清单 3. 使用 Hadoop 流实用工具
hadoop jar $HADOOP_HOME/hadoop-流.jar -input inputData -output outputData -mapper map_exec -reducer reduce_exec |
清单 3 说明如何在 Hadoop 内使用 流 实用工具,图 3 图形化地显示了如何定义流。请注意这是一个流使用的简单示例。大量的选项可用于制定如何解析数据、制定如何调用图像、为分区器和合成器指定替换图像以及调整其他配置。
图 3. 图形流示例
Ruby 示例
通过已经获得的在 流 实用工具基本理解上的经验,您已经准备编写一个简单的 Ruby map 和 reduce 应用程序并查看如何在 Hadoop 框架中使用过程。虽然此处的示例伴随着规范的 MapReduce 应用程序,但是稍后您将看到其他的应用程序(取决于您将如何用 map 和 reduce 格式实现它们)。
首选是 mapper。此脚本从 stdin 提取文本输入,首先标记它,然后将一系列键值对发送到 stdout。像大多数面向对象的脚本语言一样,这个任务几乎太简单了。如清单 4 中所示的 mapper 脚本(通过一些注释和空白区域可给与其大一点的大小)。此程序使用一个迭代器来从 stdin 中读取一行,同时另一个迭代器将该行分割成单个的标记。使用为 1 的相关值(通过选项卡分隔)将每一个标记(单词)发送到 stdout。
清单 4. Ruby map 脚本(map.rb)
#!/usr/bin/env ruby # Our input comes from STDIN STDIN.each_line do |line| # Iterate over the line, splitting the words from the line and emitting # as the word with a count of 1. line.split.each do |word| puts "#{word}t1" end end |
下一步,查看 reduce 应用程序。虽然此应用程序稍微有些复杂,但是使用 Ruby hash(关联阵列)可简化 reduce 操作(请参考清单 5)。此脚本可通过来自 stdin (通过 流 实用工具传递)的输入数据再次工作且将该行分割成一个单词或值。而后该 hash 会检查该单词;如果发现,则将计数添加到元素。否则,您需要在该单词的 hash 中创建新的条目,然后加载计数(应该是来自 mapper 过程的 1)。在所有输入都被处理以后,通过 hash 可简单迭代且将键值对发送到 stdout。
清单 5. Ruby reduce 脚本(reduce.rb)
#!/usr/bin/env ruby # Create an empty word hash wordhash = {} # Our input comes from STDIN, operating on each line STDIN.each_line do |line| # Each line will represent a word and count word, count = line.strip.split # If we have the word in the hash, add the count to it, otherwise # create a new one. if wordhash.has_key?(word) wordhash[word] += count.to_i else wordhash[word] = count.to_i end end # Iterate through and emit the word counters wordhash.each {|record, count| puts "#{record}t#{count}"} |
随着 map 和 reduce 脚本的完成,需从命令行测试它们。记得要使用 chmod +x 将这些文件更改为可执行。通过生成输入文件来启动,如清单 6 所示。
清单 6. 生成输入文件
# echo "Hadoop is an implementation of the map reduce framework for " "distributed processing of large data sets." > input # |