首页 文章

输出键,值对如何使1小时内的时间在使用Python的MapReduce中的reducer中结束?

提问于
浏览
1

我有一种情况需要使用以下格式处理非常大的文本文件:

ID \t time \t duration \t Description \t status

我想利用MapReduce来帮助我处理这个文件 . 我知道MapReduce基于键值对工作 . Mapper将输出键和一些值,MapReduce将确保所有相同的键最终在1个reducer中 .

我想在减速器中最终得到的行是时间在1小时之内 . 然后在reducer中,我想访问所有其他信息,如ID,持续时间,状态等做其他事情 . 所以我猜输出的值是列表还是什么?

我有一些Python代码来处理输入数据 . mapper.py

#!/usr/bin/env python
import sys
import re
for line in sys.stdin:
   line=line.strip()
   portions=re.split(r'\t+',line)
   time=portions[1]
#output key,value by print to stdout for reducer.py to read in.

请注意,我的数据集中的时间已经是POSIX时间格式 .

我怎样才能在Mapper中输出键值对呢?

我对MapReduce / Hadoop仍然很新,并感谢所有的帮助 . 先感谢您!

1 回答

  • 1

    这是一个策略:

    来自Mapper的

    • :发出每个记录的三个副本并使用二级排序:

    ((复合键),值)=

    • ((消息小时 - 一小时,当前消息的精确时间),消息)

    • ((消息小时,消息的准确时间),消息)

    • ((消息小时1小时,消息的准确时间),消息)

    现在:您需要标准的二级排序:

    • setPartitioner只到密钥的前半部分(消息的小时)

    • setGroupingComparator只到键的前半部分(消息的小时)

    • setSortingComparator to(消息小时,消息的准确时间)

    在reducer中:每个reducer组接收消息精确时间内/ - 60到120分钟内的所有消息 . reducer以排序顺序查看"precise time of message"的所有内容 . 因此,您可以在每个减速器中保留过去60分钟内查看的所有消息的滑动窗口

    NOTE 以上假设60分钟消息的数据可以放在单个reducer任务的内存中 . 否则,您将需要求助于将数据写入磁盘作为窗口函数的一部分 .

    Update OP要求进一步澄清窗口,所以我们走了 .

    从Mapper发出的密钥的角度考虑:每个输入记录有三个密钥 . 现在在Reducer上,这意味着每个输入记录都出现在三个不同的组中 . 原因是我们需要针对每个输入记录考虑前导和滞后记录 . 因此,现在我们让每个组都可以访问所有输入记录,这些记录可能在最早记录的60分钟内以及最新记录的60分钟内 . 由于记录按每小时最早的秒数分组:这意味着-60(分钟)到120(最大)对比属于给定小时组的任何记录 .

相关问题