我有一种情况需要使用以下格式处理非常大的文本文件:
ID \t time \t duration \t Description \t status
我想利用MapReduce来帮助我处理这个文件 . 我知道MapReduce基于键值对工作 . Mapper将输出键和一些值,MapReduce将确保所有相同的键最终在1个reducer中 .
我想在减速器中最终得到的行是时间在1小时之内 . 然后在reducer中,我想访问所有其他信息,如ID,持续时间,状态等做其他事情 . 所以我猜输出的值是列表还是什么?
我有一些Python代码来处理输入数据 . mapper.py
#!/usr/bin/env python
import sys
import re
for line in sys.stdin:
line=line.strip()
portions=re.split(r'\t+',line)
time=portions[1]
#output key,value by print to stdout for reducer.py to read in.
请注意,我的数据集中的时间已经是POSIX时间格式 .
我怎样才能在Mapper中输出键值对呢?
我对MapReduce / Hadoop仍然很新,并感谢所有的帮助 . 先感谢您!
1 回答
这是一个策略:
来自Mapper的
((复合键),值)=
((消息小时 - 一小时,当前消息的精确时间),消息)
((消息小时,消息的准确时间),消息)
((消息小时1小时,消息的准确时间),消息)
现在:您需要标准的二级排序:
setPartitioner只到密钥的前半部分(消息的小时)
setGroupingComparator只到键的前半部分(消息的小时)
setSortingComparator to(消息小时,消息的准确时间)
在reducer中:每个reducer组接收消息精确时间内/ - 60到120分钟内的所有消息 . reducer以排序顺序查看"precise time of message"的所有内容 . 因此,您可以在每个减速器中保留过去60分钟内查看的所有消息的滑动窗口
NOTE 以上假设60分钟消息的数据可以放在单个reducer任务的内存中 . 否则,您将需要求助于将数据写入磁盘作为窗口函数的一部分 .
Update OP要求进一步澄清窗口,所以我们走了 .
从Mapper发出的密钥的角度考虑:每个输入记录有三个密钥 . 现在在Reducer上,这意味着每个输入记录都出现在三个不同的组中 . 原因是我们需要针对每个输入记录考虑前导和滞后记录 . 因此,现在我们让每个组都可以访问所有输入记录,这些记录可能在最早记录的60分钟内以及最新记录的60分钟内 . 由于记录按每小时最早的秒数分组:这意味着-60(分钟)到120(最大)对比属于给定小时组的任何记录 .