首页 文章

查找十亿个文件中的一百个最大数字

提问于
浏览
36

我今天去接受采访,被问到这个问题:

假设您有10亿个整数在磁盘文件中未分类 . 你如何确定最大的一百个数字?

我甚至不确定从哪里开始这个问题 . 给出正确结果的最有效流程是什么?我是否需要通过磁盘文件一百次获取我的列表中尚未包含的最高数字,或者是否有更好的方法?

14 回答

  • 1

    我相信最快的方法是使用一个非常大的位图来记录哪些数字存在 . 为了表示32位整数,这需要是2 ^ 32/8字节,大约== 536MB . 扫描整数只需设置位图中的相应位 . 然后寻找最高的100个条目 .

    注意:如果您看到差异,则会找到最高的100个数字而不是数字的最高100个实例 .

    这种方法在你的采访者可能阅读的非常好的编程珍珠书中讨论过!

  • 1

    这是我的初始算法:

    create array of size 100 [0..99].
    read first 100 numbers and put into array.
    sort array in ascending order.
    while more numbers in file:
        get next number N.
        if N > array[0]:
            if N > array[99]:
                shift array[1..99] to array[0..98].
                set array[99] to N.
            else
                find, using binary search, first index i where N <= array[i].
                shift array[1..i-1] to array[0..i-2].
                set array[i-1] to N.
            endif
        endif
    endwhile
    

    这有(非常轻微)的优点是前100个元素没有O(n ^ 2)混洗,只有O(n log n)排序,你可以很快识别并丢弃那些太小的元素 . 它还使用二进制搜索(最多7次比较)来找到正确的插入点,而不是50(平均)用于简单的线性搜索(不是我建议其他人提供这样的解决方案,只是它可能会给面试官留下深刻的印象) ) .

    您甚至可以获得奖励积分,建议在C中使用优化 shift 操作,例如 memcpy ,前提是您可以确定重叠不是问题 .


    您可能要考虑的另一种可能性是维护三个列表(每个列表最多100个整数):

    read first hundred numbers into array 1 and sort them descending.
    while more numbers:
        read up to next hundred numbers into array 2 and sort them descending.
        merge-sort lists 1 and 2 into list 3 (only first (largest) 100 numbers).
        if more numbers:
            read up to next hundred numbers into array 2 and sort them descending.
            merge-sort lists 3 and 2 into list 1 (only first (largest) 100 numbers).
        else
            copy list 3 to list 1.
        endif
    endwhile
    

    我不确定,但最终可能比持续改组更有效率 .

    合并排序是一个简单的选择(对于合并排序列表1和2到3):

    list3.clear()
    while list3.size() < 100:
        while list1.peek() >= list2.peek():
            list3.add(list1.pop())
        endwhile
        while list2.peek() >= list1.peek():
            list3.add(list2.pop())
        endwhile
    endwhile
    

    简单地说,由于它们已经按降序排序,因此将前100个值从组合列表中拉出来 . 我没有详细检查是否会更有效,我只是提供它作为一种可能性 .

    我怀疑采访者会对“开箱即用”思维的可能性以及你说应该对其性能进行评估这一事实印象深刻 .

    与大多数采访一样,技术技能是他们所关注的事情之一 .

  • 10

    显然,采访者希望你指出两个关键事实:

    • 您无法将整个整数列表读入内存,因为它太大了 . 所以你必须逐一阅读 .

    • 您需要一个有效的数据结构来容纳100个最大的元素 . 此数据结构必须支持以下操作:

    • Get-Size :获取容器中的值数 .

    • Find-Min :获取最小值 .

    • Delete-Min :删除最小值,将其替换为新的较大值 .

    • Insert :将另一个元素插入容器中 .

    通过评估数据结构的要求,计算机科学教授希望您建议使用Heap(Min-Heap),因为它旨在完全支持我们需要的操作 .

    例如,对于Fibonacci heaps,操作 Get-SizeFind-MinInsert 都是 O(1)Delete-MinO(log n) (在这种情况下为 n <= 100 ) .

    在实践中,您可以使用您喜欢的语言的标准库中的优先级队列(例如,来自C中的 #include <queue>priority_queue ),这通常使用堆来实现 .

  • 0

    创建一个包含100个数字的数组,全部为-2 ^ 31 .

    检查从磁盘读取的第一个数字是否大于列表中的第一个数字 . 如果是将数组复制为1索引并将其更新为新数字 . 如果没有,请检查100中的下一个,依此类推 .

    当你读完所有10亿个数字后,你应该拥有阵列中最高的100个数字 .

    任务完成 .

  • 17

    我按顺序遍历列表 . 在我去的时候,我将元素添加到集合(或多重集,具体取决于重复) . 当集合达到100时,我只会在值大于集合中的最小值时插入(O(log m)) . 然后删除分钟 .

    调用列表中的值数n和要查找的值的数量m:

    这是O(n * log m)

  • 53

    处理算法的速度绝对无关紧要(除非它完全是哑巴) .

    这里的瓶颈是I / O(指定它们在磁盘上) . 因此,请确保使用大缓冲区 .

  • 0

    保持100个整数的固定数组 . 将它们初始化为Int.MinValue . 当您从10亿个整数读取时,将它们与数组的第一个单元格中的数字(索引0)进行比较 . 如果更大,则向上移动到下一个 . 再次,如果更大,然后向上移动直到你达到结束或更小的值 . 然后将值存储在索引中并将前一个单元格中的所有值移动一个单元格...执行此操作你会发现100个最大整数 .

  • 3

    你将不得不检查每个号码,没有办法解决这个问题 .

    就像提供的解决方案略有改进一样

    给出100个数字的列表:

    9595
    8505
    ...
    234
    1
    

    您将检查新找到的值是否为我们数组的最小值,如果是,则插入它 . 但是从底部到顶部进行搜索可能非常昂贵,您可以考虑采用分而治之的方法,例如评估数组中的第50个项目并进行比较,然后您就知道是否需要插入值前50个项目,或最低50个 . 您可以重复此过程以获得更快的搜索,因为我们已经消除了50%的搜索空间 .

    还要考虑整数的数据类型 . 如果它们是32位整数并且您使用的是64位系统,那么如果它们连续存储在内存中,您可能可以执行一些聪明的内存处理和按位操作来同时处理磁盘上的两个数字 .

  • 1

    我想现在有人应该提到priority queue . 您只需要保留当前前100个数字,知道最低数字是什么,并且能够用更高的数字替换它 . 那个's what a priority queue does for you - some implementations may sort the list, but it'不是必需的 .

  • 7
    • 假设1个100亿个数字适合内存,最好的排序算法是堆排序 . 形成一个堆并获得前100个数字 . 复杂度o(nlogn 100(用于获取前100个数字))

    改善解决方案

    将实现划分为两个堆(以便插入不那么复杂),并且在获取前100个元素时执行英制合并算法 .

  • 3

    这里有一些python代码实现了ferdinand beyer上面提出的算法 . 本质上它是一个堆,唯一的区别是删除已经与插入操作合并

    import random
    import math
    
    class myds:
    """ implement a heap to find k greatest numbers out of all that are provided"""
    k = 0
    getnext = None
    heap = []
    
    def __init__(self, k, getnext ):
        """ k is the number of integers to return, getnext is a function that is called to get the next number, it returns a string to signal end of stream """
        assert k>0
        self.k = k
        self.getnext = getnext
    
    
    def housekeeping_bubbleup(self, index):
        if index == 0:
            return()
    
        parent_index = int(math.floor((index-1)/2))
        if self.heap[parent_index] > self.heap[index]:
            self.heap[index], self.heap[parent_index] = self.heap[parent_index], self.heap[index]
        self.housekeeping_bubbleup(parent_index)
        return()
    
    def insertonly_level2(self, n):
        self.heap.append(n)
        #pdb.set_trace()
        self.housekeeping_bubbleup(len(self.heap)-1)
    
    def insertonly_level1(self, n):
        """ runs first k times only, can be as slow as i want """
        if len(self.heap) == 0:
            self.heap.append(n)
            return()
        elif n > self.heap[0]:
            self.insertonly_level2(n)
        else:
            return()
    
    def housekeeping_bubbledown(self, index, length):
        child_index_l = 2*index+1
        child_index_r = 2*index+2
        child_index = None
        if child_index_l >= length and child_index_r >= length: # No child
            return()
        elif child_index_r >= length: #only left child
            if self.heap[child_index_l] < self.heap[index]: # If the child is smaller
                child_index = child_index_l
            else:
                return()
        else: #both child
            if self.heap[ child_index_r] < self.heap[ child_index_l]:
                child_index = child_index_r
            else:
                child_index = child_index_l
    
        self.heap[index], self.heap[ child_index] = self.heap[child_index], self.heap[index]
        self.housekeeping_bubbledown(child_index, length)
        return()
    
    def insertdelete_level1(self, n):
        self.heap[0] = n
        self.housekeeping_bubbledown(0, len(self.heap))
        return()
    
    def insert_to_myds(self,  n ):
        if len(self.heap) < self.k:
            self.insertonly_level1(n)
        elif n > self.heap[0]:
            #pdb.set_trace()
            self.insertdelete_level1(n)
        else:
            return()
    
    def run(self ):
        for n in self.getnext:
            self.insert_to_myds(n)
            print(self.heap)
            #            import pdb; pdb.set_trace()
        return(self.heap)
    
    def createinput(n):
        input_arr = range(n)
        random.shuffle(input_arr)
        f = file('input', 'w')
        for value in input_arr:
            f.write(str(value))
            f.write('\n')
    
    input_arr = []
    with open('input') as f:
        input_arr = [int(x) for x in f]
    myds_object = myds(4, iter(input_arr))
    output = myds_object.run()
    print output
    
  • 8

    如果您使用快速排序找到第100个订单统计信息,它将平均为O(十亿) . 但我怀疑有这样的数字,并且由于这种方法需要随机访问,它将比O(十亿日志(100))更快 .

  • 0

    基于@paxdiablo提供的第二个解决方案,这是另一个解决方案(约会后,我没有羞耻!) . 基本的想法是,只有当它们大于你已经拥有的最小数量并且排序不是真正必要时,你应该读取另一个k数字:

    // your variables
    n = 100
    k = a number > n and << 1 billion
    create array1[n], array2[k]
    
    read first n numbers into array2
    find minimum and maximum of array2 
    while more numbers:
      if number > maximum:
        store in array1
        if array1 is full: // I don't need contents of array2 anymore
           array2 = array1
           array1 = []
      else if number > minimum:
        store in array2
        if array2 is full:
           x = n - array1.count()
           find the x largest numbers of array2 and discard the rest
           find minimum and maximum of array2
      else:
        discard the number
    endwhile
    
    // Finally
    x = n - array1.count()
    find the x largest numbers of array2 and discard the rest
    return merge array1 and array2
    

    关键步骤是在array2中找到最大x数的函数 . 但是你可以使用这个事实,你知道最小值和最大值来加速函数以找到array2中最大的x数 .

    实际上,有很多可能的优化,因为你真的不需要对它进行排序,你只需要x个最大的数字 .

    此外,如果k足够大并且您有足够的内存,您甚至可以将其转换为递归算法以查找n个最大数字 .

    最后,如果数字已经排序(按任何顺序),则算法为O(n) .

    显然,这只是理论上的,因为在实践中你会使用标准的排序算法,而瓶颈可能就是IO .

  • 1

    有许多聪明的方法(如优先级队列解决方案),但您可以做的最简单的事情之一也可以快速有效 .

    如果你想要 n 的顶部 k ,请考虑:

    allocate an array of k ints
    while more input
      perform insertion sort of next value into the array
    

    这可能听起来很荒谬 . 您可能期望这是 O(n^2) ,但它实际上只是 O(k*n) ,并且如果 k 远小于 n (如问题陈述中所假设的那样),则它接近 O(n) .

    您可能会认为常数因子太高,因为平均每次输入的比较和移动都很多 . 但是,在迄今为止看到的第一个最大值的第一次比较中,大多数 Value 将被轻易拒绝 . 如果您有十亿个输入,那么到目前为止,只有一小部分可能大于100 .

    (你可以解释一个最坏情况的输入,其中每个值都大于它前身,因此需要 k 比较和移动每个输入 . 但这本质上是一个排序输入,问题语句表示输入未排序 . )

    即使是二进制搜索改进(找到插入点)也只会将比较减少到 ceil(log_2(k)) ,除非你的特殊情况与 k th-so-far进行额外比较,否则你似乎比连续50次快得多比较和动作 . 这就是为什么许多系统排序放弃Quicksort而支持小尺寸的插入排序 .

    还要考虑这几乎不需要额外的内存,并且该算法对缓存非常友好(对于堆或优先级队列可能也可能不是这样),并且写入没有错误是微不足道的 .

    读取文件的过程可能是主要的瓶颈,因此实际的性能提升可能是通过选择一个简单的解决方案,您可以集中精力寻找一个良好的缓冲策略来最小化i / o .

    如果 k 可以任意大,接近 n ,则考虑优先级队列或其他更智能的数据结构是有意义的 . 另一种选择是将输入分成多个块,并行地对每个块进行排序,然后合并 .

相关问题