我编写了一个MapReduce代码来计算每个键的一组值的平均值 . 当我只使用mapper和reducer时,它工作正常 . 但是当我在它们之间引入一个组合器来减少减速器的负载时,关键是在结果中重复 . TIA . 代码如下 .

mapper.py:

#!/usr/bin/env python
import sys
from datetime import datetime

for line in sys.stdin:
    data = line.strip().split("\t")
    if(len(data) < 5):
    continue
    date, time, store, item, cost, payment = data


    print("{0}\t{1}".format(datetime.strptime(date, "%Y-%m-%d").weekday(),cost))

combiner.py:

#!/usr/bin/env python
import sys

from datetime import datetime

oldKey = None

salesList = ""

for line in sys.stdin:
    data = line.rstrip().split('\t')

    thisKey, thisSale = data

    if(oldKey and oldKey != thisKey):
        print("{0}\t{1}".format(oldKey,salesList))
        salesList = ""

    else:
        if(oldKey == thisKey):
            if(salesList != ""):
                salesList = salesList + ',' + thisSale
            else:
                salesList = thisSale

    oldKey = thisKey

if(oldKey):
    salesList = salesList + ',' + thisSale
    print("{0}\t{1}".format(oldKey,salesList))
    salesList = ""

reducer.py

#!/usr/bin/env python
import sys
from datetime import datetime

oldKey = None
meanSales = None
salesTotal = 0
count = 0
for line in sys.stdin:
    data = line.rstrip().split('\t')

    thisKey, thisSale = data
    thisSaleList = thisSale.split(',')
    thisSaleListFloat = list(map(float, thisSaleList))

    meanSales = sum(thisSaleListFloat)/len(thisSaleListFloat)

    print("{0}\t{1}".format(thisKey, meanSales))

Output

0   249.91917747419384
0   250.09807318775844
1   249.87984898663836
1   249.59593170284487
2   249.95321425419965
2   249.75339205149234
3   249.54634982922747
3   250.19731461573994
4   250.3129656082639
4   250.13323419720658
5   250.13367036331366
5   250.03468060131152
6   250.207532163134
6   249.67593639719652