首页 文章

将大量数据写入stdin

提问于
浏览
0

我正在向stdin写入大量数据 .

我如何确保它不会阻塞?

p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
p.stdin.write('A very very very large amount of data')
p.stdin.flush()
output = p.stdout.readline()

在我读完一个大字符串并写入之后,它似乎挂在了 p.stdin.write() .

我有一大堆文件,将按顺序写入stdin(> 1k文件)

所以会发生的是我正在运行一个循环

#this loop is repeated for all the files
for stri in lines:
p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
p.stdin.write(stri)
output = p.stdout.readline()
#do some processing

它以某种方式挂在文件号 . 400.该文件是一个包含长字符串的大文件 .

我怀疑它是一个阻塞问题 .

只有当我从0迭代到1000时才会发生这种情况 . 但是,如果我从文件400开始,则不会发生错误

2 回答

  • 2

    您可能必须使用 Popen.communicate() .

    如果您向stdin写入大量数据,并且在此期间子进程生成输出到stdout,那么在处理所有stdin数据之前,子进程的stdout缓冲区变满可能会成为问题 . 子进程在写入stdout时会阻塞(因为您没有读取它),并且在写入stdin时被阻止 .

    Popen.communicate() 可以用来编写stdin并同时读取stdout / stderr以避免以前的问题 .

    注意: Popen.communicate() 仅适用于输入和输出数据适合您的存储器(它们不是太大)的情况 .

    Update: 如果您决定使用线程进行攻击,这里有一个示例父级和子级流程实现,您可以根据自己的需要进行定制:

    parent.py:

    #!/usr/bin/env python2
    import os
    import sys
    import subprocess
    import threading
    import Queue
    
    
    class MyStreamingSubprocess(object):
        def __init__(self, *argv):
            self.process = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
            self.stdin_queue = Queue.Queue()
            self.stdout_queue = Queue.Queue()
            self.stdin_thread = threading.Thread(target=self._stdin_writer_thread)
            self.stdout_thread = threading.Thread(target=self._stdout_reader_thread)
            self.stdin_thread.start()
            self.stdout_thread.start()
    
        def process_item(self, item):
            self.stdin_queue.put(item)
            return self.stdout_queue.get()
    
        def terminate(self):
            self.stdin_queue.put(None)
            self.process.terminate()
            self.stdin_thread.join()
            self.stdout_thread.join()
            return self.process.wait()
    
        def _stdin_writer_thread(self):
            while 1:
                item = self.stdin_queue.get()
                if item is None:
                    # signaling the child process that the end of the
                    # input has been reached: some console progs handle
                    # the case when reading from stdin returns empty string
                    self.process.stdin.close()
                    break
                try:
                    self.process.stdin.write(item)
                except IOError:
                    # making sure that the current self.process_item()
                    # call doesn't deadlock
                    self.stdout_queue.put(None)
                    break
    
        def _stdout_reader_thread(self):
            while 1:
                try:
                    output = self.process.stdout.readline()
                except IOError:
                    output = None
                self.stdout_queue.put(output)
                # output is empty string if the process has
                # finished or None if an IOError occurred
                if not output:
                    break
    
    
    if __name__ == '__main__':
        child_script_path = os.path.join(os.path.dirname(__file__), 'child.py')
        process = MyStreamingSubprocess(sys.executable, '-u', child_script_path)
        try:
            while 1:
                item = raw_input('Enter an item to process (leave empty and press ENTER to exit): ')
                if not item:
                    break
                result = process.process_item(item + '\n')
                if result:
                    print('Result: ' + result)
                else:
                    print('Error processing item! Exiting.')
                    break
        finally:
            print('Terminating child process...')
            process.terminate()
            print('Finished.')
    

    child.py:

    #!/usr/bin/env python2
    import sys
    
    while 1:
        item = sys.stdin.readline()
        sys.stdout.write('Processed: ' + item)
    

    注意: IOError 在读取器/写入器线程上处理,以处理子进程退出/崩溃/终止的情况 .

  • 1

    要以可移植的方式避免死锁,请在单独的线程中写入子进程:

    #!/usr/bin/env python
    from subprocess import Popen, PIPE
    from threading import Thread
    
    def pump_input(pipe, lines):
        with pipe:
            for line in lines:
                pipe.write(line)
    
    p = Popen(path, stdin=PIPE, stdout=PIPE, bufsize=1)
    Thread(target=pump_input, args=[p.stdin, lines]).start()
    with p.stdout:
        for line in iter(p.stdout.readline, b''): # read output
            print line,
    p.wait()
    

    Python: read streaming input from subprocess.communicate()

相关问题