首页 文章

处理长文件行的最佳多线程场景是什么?

提问于
浏览
1

我有一个大文件,我想阅读并处理多线程文件的所有行(甚至行) .

一个建议读取整个文件并将其分解为多个文件(与线程相同),然后让每个线程处理一个特定的文件 . 因为这个想法将读取整个文件,再次写入并读取多个文件似乎很慢(3倍I / O),我认为必须有更好的方案,

我自己虽然这可能是一个更好的场景:

一个线程将读取文件并将数据放在全局变量上,其他线程将读取该变量和进程中的数据 . 更详细:

一个线程将通过运行 func1 函数读取主文件并将每个偶数行放在缓冲区: line1Buffer 的最大大小 MAX_BUFFER_SIZE ,其他线程将从缓冲区弹出其数据并使用运行 func2 函数处理它 . 在代码中:

全局变量:

#define MAX_BUFFER_SIZE 100
vector<string> line1Buffer;
bool continue = true;// to end thread 2 to last thread by setting to false
string file = "reads.fq";

功能 func1 :(线程1)

void func1(){
 ifstream ifstr(file.c_str());
 for (long long i = 0; i < numberOfReads; i++) { // 2 lines per read
    getline(ifstr,ReadSeq);
    getline(ifstr,ReadSeq);// reading even lines
    while( line1Buffer.size() == MAX_BUFFER_SIZE )
        ; // to delay when the buffer is full
    line1Buffer.push_back(ReadSeq);
 }
 continue = false;
 return;
}

和功能 func2 :(其他主题)

void func2(){
 string ReadSeq;
 while(continue){
    if(line2Buffer.size() > 0 ){
        ReadSeq  = line1Buffer.pop_back();
        // do the proccessing....
    }
 }
}

关于速度:

如果读取部分较慢,那么总时间将等于仅读取文件一次(并且缓冲区每次可能只包含1个文件,因此只有1个另一个线程可以使用线程1) . 如果处理部分较慢,那么总时间将等于 numberOfThreads - 1 线程整个处理的时间 . 这两种情况都比读取文件和用1个线程写入多个文件更快,然后用多线程读取文件并处理...

所以有2个问题:

1-如何通过线程调用函数,线程1运行的方式 func1 和其他运行 func2

2-有没有更快的情况?

3- [删除]任何人都可以将这个想法扩展到M个线程用于读取和N个线程进行处理?显然我们知道: M+N==umberOfThreads 是真的

编辑:第3个问题不正确,因为多个线程无法帮助读取单个文件

谢谢大家

4 回答

  • 0

    另一种方法可以是交叉线程 . 读取由每个线程完成,但一次只有1个 . 由于在第一次迭代中等待,所以线程将被交错 .

    但这只是一个可扩展的选项,如果 work() 是瓶颈(那么每个非并行执行会更好)

    线:

    while (!end) {
        // should be fair!
        lock();
        read();
        unlock();
    
        work();
    }
    

    基本示例:(您应该添加一些错误处理)

    void thread_exec(ifstream* file,std::mutex* mutex,int* global_line_counter) {
        std::string line;
        std::vector<std::string> data;
        int i;
        do {
            i = 0;
            // only 1 concurrent reader
            mutex->lock();
            // try to read the maximum number of lines
            while(i < MAX_NUMBER_OF_LINES_PER_ITERATION && getline(*file,line)) {
                // only the even lines we want to process
                if (*global_line_counter % 2 == 0) {
                    data.push_back(line);
                    i++;
                }
                (*global_line_counter)++;
    
            }
            mutex->unlock();
    
            // execute work for every line
            for (int j=0; j < data.size(); j++) {
                work(data[j]);
            }
    
            // free old data
            data.clear();
         //until EOF was not reached
       } while(i == MAX_NUMBER_OF_LINES_PER_ITERATION);
    
    }
    
    void process_data(std::string file) {
         // counter for checking if line is even
         int global_line_counter = 0;
         // open file
         ifstream ifstr(file.c_str());
         // mutex for synchronization
         // maybe a fair-lock would be a better solution
         std::mutex mutex;
         // create threads and start them with thread_exec(&ifstr, &mutex, &global_line_counter);
         std::vector<std::thread> threads(NUM_THREADS);
         for (int i=0; i < NUM_THREADS; i++) {
             threads[i] = std::thread(thread_exec, &ifstr, &mutex, &global_line_counter);
         }
         // wait until all threads have finished
         for (int i=0; i < NUM_THREADS; i++) {
             threads[i].join();
         }
    }
    
  • 1

    你的瓶颈是什么?硬盘还是处理时间?

    如果它是硬盘,那么当你达到硬件的极限时,你可能不会再获得更多的性能 . 并发读取比尝试跳转文件要快得多 . 让多个线程尝试读取您的文件几乎肯定会降低整体速度,因为它会增加磁盘抖动 .

    读取文件的单个线程和一个线程池(或者只有一个其他线程)来处理内容可能就像你能得到的一样好 .

    全局变量:

    这是一个坏习惯 .

  • 1

    假设有 #p 踏板,帖子中提到的两个场景和答案:

    1)使用'a'线程进行读取并使用其他线程进行处理,在这种情况下, #p-1 线程将与仅一个线程读取进行比较 . 假设完全操作的时间是jobTime,用n个线程处理的时间是pTime(n),所以:

    最糟糕的情况发生在读取时间比处理速度慢的情况下,最好的情况是处理速度慢于 jobTime 等于 pTime(#p-1)+readTime 的读取时间

    2)使用所有 #p 线程读取和处理 . 在这种情况下,每个线程都需要执行两个步骤 . 第一步是读取文件的一部分,其大小为 MAX_BUFFER_SIZE ,这是顺序的;意味着没有两个线程可以一次读取 . 但第二部分是处理可以并行的读数据 . 这种方式在最坏的情况下 jobTime 与之前一样是 pTime(1)+readTime (但*),但最佳优化的情况是pTime(#p)readTime,它比以前更好 .

    *:在第二种方法的最坏情况下,但是读取速度较慢但你可以找到一个优化的 MAX_BUFFER_SIZE ,其中(在最坏的情况下)一个线程的某些读取将与另一个线程的某些处理重叠 . 使用此优化 MAX_BUFFER_SIZE jobTime 将小于 pTime(1)+readTime 并且可能会分叉到 readTime

  • 1

    首先,读取文件是一个缓慢的操作,因此除非您正在进行一些超重处理,否则文件读取将受到限制 .

    如果您决定采用多线程路由,则队列是正确的方法 . 只要确保你向后推出一个弹出窗口 . stl :: deque应该运行良好 . 此外,您还需要使用互斥锁来锁定队列,并将其与条件变量同步 .

    最后一件事是,如果我们推动的场景的队列比我们弹出的更快,你将需要限制大小 .

相关问题