首页 文章

Grand Central Dispatch中线程限制的解决方法?

提问于
浏览
32

使用Grand Central Dispatch,可以轻松地在非主线程上执行耗时的任务,避免阻塞主要主题并保持UI响应 . 只需使用 dispatch_async 并在全局并发队列上执行任务即可 .

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
    // code
});

然而,听起来好得令人难以置信,因为这个通常有它们的缺点 . 在我们的iOS应用程序项目中使用了很多之后,最近我们发现它有64个线程限制 . 一旦我们达到限制,应用程序将冻结/挂起 . 通过暂停应用程序与Xcode,我们可以看到主线程由 semaphore_wait_trap 持有 .

在网上搜索确认其他人也遇到了这个问题,但到目前为止还没有找到解决方案 .

达到调度线程硬限制:64(同步操作中阻塞的调度线程太多)

Another stackoverflow question确认使用 dispatch_syncdispatch_barrier_async 时也会出现此问题 .

Question:
由于Grand Central Dispatch有64个线程限制,有没有解决方法呢?

提前致谢!

1 回答

  • 64

    好吧,如果你受到束缚和决定,你可以摆脱GCD的束缚,然后使用pthreads直接攻击操作系统的每个进程线程限制,但最重要的是:如果你正在击中在GCD中的队列宽度限制,您可能需要考虑重新评估并发方法 .

    在极端情况下,有两种方法可以达到极限:

    • 您可以通过阻塞系统调用在某个OS原语上阻止64个线程 . (I / O界限)

    • 你可以合法地拥有64个可运行的任务,所有这些任务都准备好同时摇摆 . (CPU绑定)

    如果您等待I / O,GCD将在您的文件描述符(或机器端口)上的数据可用时排队您的块(或函数) . 请参阅dispatch_io_create and friends的文档 .

    如果有帮助,这里是使用GCD I / O机制实现的TCP echo服务器的一个小例子(无保修):

    in_port_t port = 10000;
    void DieWithError(char *errorMessage);
    
    // Returns a block you can call later to shut down the server -- caller owns block.
    dispatch_block_t CreateCleanupBlockForLaunchedServer()
    {
        // Create the socket
        int servSock = -1;
        if ((servSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
            DieWithError("socket() failed");
        }
    
        // Bind the socket - if the port we want is in use, increment until we find one that isn't
        struct sockaddr_in echoServAddr;
        memset(&echoServAddr, 0, sizeof(echoServAddr));
        echoServAddr.sin_family = AF_INET;
        echoServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        do {
            printf("server attempting to bind to port %d\n", (int)port);
            echoServAddr.sin_port = htons(port);
        } while (bind(servSock, (struct sockaddr *) &echoServAddr, sizeof(echoServAddr)) < 0 && ++port);
    
        // Make the socket non-blocking
        if (fcntl(servSock, F_SETFL, O_NONBLOCK) < 0) {
            shutdown(servSock, SHUT_RDWR);
            close(servSock);
            DieWithError("fcntl() failed");
        }
    
        // Set up the dispatch source that will alert us to new incoming connections
        dispatch_queue_t q = dispatch_queue_create("server_queue", DISPATCH_QUEUE_CONCURRENT);
        dispatch_source_t acceptSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, servSock, 0, q);
        dispatch_source_set_event_handler(acceptSource, ^{
            const unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
            for (unsigned long i = 0; i < numPendingConnections; i++) {
                int clntSock = -1;
                struct sockaddr_in echoClntAddr;
                unsigned int clntLen = sizeof(echoClntAddr);
    
                // Wait for a client to connect
                if ((clntSock = accept(servSock, (struct sockaddr *) &echoClntAddr, &clntLen)) >= 0)
                {
                    printf("server sock: %d accepted\n", clntSock);
    
                    dispatch_io_t channel = dispatch_io_create(DISPATCH_IO_STREAM, clntSock, q, ^(int error) {
                        if (error) {
                            fprintf(stderr, "Error: %s", strerror(error));
                        }
                        printf("server sock: %d closing\n", clntSock);
                        close(clntSock);
                    });
    
                    // Configure the channel...
                    dispatch_io_set_low_water(channel, 1);
                    dispatch_io_set_high_water(channel, SIZE_MAX);
    
                    // Setup read handler
                    dispatch_io_read(channel, 0, SIZE_MAX, q, ^(bool done, dispatch_data_t data, int error) {
                        BOOL close = NO;
                        if (error) {
                            fprintf(stderr, "Error: %s", strerror(error));
                            close = YES;
                        }
    
                        const size_t rxd = data ? dispatch_data_get_size(data) : 0;
                        if (rxd) {
                            // echo...
                            printf("server sock: %d received: %ld bytes\n", clntSock, (long)rxd);
                            // write it back out; echo!
                            dispatch_io_write(channel, 0, data, q, ^(bool done, dispatch_data_t data, int error) {});
                        }
                        else {
                            close = YES;
                        }
    
                        if (close) {
                            dispatch_io_close(channel, DISPATCH_IO_STOP);
                            dispatch_release(channel);
                        }
                    });
                }
                else {
                    printf("accept() failed;\n");
                }
            }
        });
    
        // Resume the source so we're ready to accept once we listen()
        dispatch_resume(acceptSource);
    
        // Listen() on the socket
        if (listen(servSock, SOMAXCONN) < 0) {
            shutdown(servSock, SHUT_RDWR);
            close(servSock);
            DieWithError("listen() failed");
        }
    
        // Make cleanup block for the server queue
        dispatch_block_t cleanupBlock = ^{
            dispatch_async(q, ^{
                shutdown(servSock, SHUT_RDWR);
                close(servSock);
                dispatch_release(acceptSource);
                dispatch_release(q);
            });
        };
    
        return Block_copy(cleanupBlock);
    }
    

    无论如何......回到手头的话题:

    如果你处于第2阶段,你应该问自己:“我通过这种方法获得了什么?”假设你拥有最专业的MacPro - 12个内核,24个超线程/虚拟内核 . 有64个线程,你有一个约 . 3:1线程与虚拟核心比率 . 上下文切换和缓存未命中不是免费的 . 请记住,我们假设您不是因为这种情况而受到I / O限制,因此您通过执行比核心更多的任务所做的就是浪费CPU时间和上下文切换以及缓存抖动 .

    实际上,如果您的应用程序挂起,因为您的队列已经饿死了 . 最常见的是,当没有线程剩余时,多个互锁线程在同一队列上尝试 dispatch_sync . 这总是失败的 .

    原因如下:队列宽度是一个实现细节 . GCD的64线程宽度限制没有记录,因为精心设计的并发体系结构不应该依赖于队列宽度 . 您应该始终设计并发体系结构,使得2线程宽的队列最终将作为1000线程宽队列完成相同结果(如果较慢)的作业 . 如果不这样做,您的队列将永远存在饥饿的可能性 . 将工作负载划分为可并行化的单元应该是为了优化的可能性,而不是基本功能的要求 . 在开发期间强制执行此规则的一种方法是尝试在使用并发队列的位置使用串行队列,但期望非互锁行为 . 执行这样的检查可以帮助您更早地捕获一些(但不是全部)这些错误 .

    另外,对于原始问题的精确点:IIUC,64个线程限制是每个顶级并发队列64个线程,所以如果你真的觉得需要,你可以使用所有三个顶级并发队列(默认,高和低)优先级)总共实现64个以上的线程 . 请不要嫉妒自己 . 你_6442070_重新挨饿64个线程的队列,你最终可能只填充所有三个顶级队列和/或进入每个进程的线程限制并且也这样饿死自己 .

相关问题