首页 文章

python:读取线程中的子进程输出

提问于
浏览
9

我有一个可执行文件,我使用subprocess.Popen调用 . 然后,我打算通过stdin使用一个线程从stdin中提取一些数据,该线程从队列中读取其值,稍后将在另一个线程中填充 . 应该使用另一个线程中的stdout管道读取输出,并再次在队列中进行排序 .

据我之前的研究所理解,使用带队列的线程是一种很好的做法 .

遗憾的是,外部可执行文件不会快速给出每个管道输入的答案,因此简单的写入,读取线周期不是一个选项 . 可执行文件实现了一些内部多线程,我希望输出一旦可用,就要附加读者线程 .

作为测试可执行文件的示例,只需对每行进行随机播放(shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
    line = line.strip()

    # shuffle line
    line = list(line)
    shuffle(line)
    line = "".join(line)

    sys.stdout.write("%s\n"%(line))
    sys.stdout.flush() # avoid buffers

请注意,这已经尽可能没有缓冲 . 或者不是吗?这是我的精简测试程序:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
    def __init__(self, p_in, source_queue):
        threading.Thread.__init__(self)
        self.pipe = p_in
        self.source_queue = source_queue

    def run(self):
        while True:
            source = self.source_queue.get()
            print "writing to process: ", repr(source)
            self.pipe.write(source)
            self.pipe.flush()
            self.source_queue.task_done()

class ReadThread(threading.Thread):
    def __init__(self, p_out, target_queue):
        threading.Thread.__init__(self)
        self.pipe = p_out
        self.target_queue = target_queue

    def run(self):
        while True:
            line = self.pipe.readline() # blocking read
            if line == '':
                break
            print "reader read: ", line.rstrip()
            self.target_queue.put(line)

if __name__ == "__main__":

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    source_queue = Queue.Queue()
    target_queue = Queue.Queue()

    writer = WriteThread(proc.stdin, source_queue)
    writer.setDaemon(True)
    writer.start()

    reader = ReadThread(proc.stdout, target_queue)
    reader.setDaemon(True)
    reader.start()

    # populate queue
    for i in range(10):
        source_queue.put("string %s\n" %i)
    source_queue.put("")

    print "source_queue empty: ", source_queue.empty()
    print "target_queue empty: ", target_queue.empty()

    import time
    time.sleep(2) # expect some output from reader thread

    source_queue.join() # wait until all items in source_queue are processed
    proc.stdin.close()  # should end the subprocess
    proc.wait()

这给出了以下输出(python2.7):

writing to process:  'string 0\n'
writing to process:  'string 1\n'
writing to process:  'string 2\n'
writing to process:  'string 3\n'
writing to process:  'string 4\n'
writing to process:  'string 5\n'
writing to process:  'string 6\n'
source_queue empty: writing to process:  'string 7\n'
writing to process:  'string 8\n'
writing to process:  'string 9\n'
writing to process:  ''
 True
target_queue empty:  True

然后没有2秒......

reader read:  rgsn0i t
reader read:  nrg1sti
reader read:  tis n2rg
reader read:  snt gri3
reader read:  nsri4 tg
reader read:  stir5 gn
reader read:   gnri6ts
reader read:   ngrits7
reader read:  8nsrt ig
reader read:  sg9 nitr

预期开头的交错 . 但是,子进程的输出直到子进程结束后才会出现 . 随着更多的管道输入我得到一些输出,因此我假设stdout管道中的缓存问题 . 根据此处发布的其他问题,flushing stdout(在子进程中)应该可以工作,至少在Linux上是这样 .

1 回答

  • 8

    你的问题与 subprocess 模块无关,或线程(因为它们是有问题的),甚至是混合子进程和线程(一个非常糟糕的想法,甚至比使用线程开始更糟糕,除非你可以获得的子进程模块)来自code.google.com/p/python-subprocess32)或从多个线程访问相同的东西(如 print 语句那样) .

    会发生什么是你的 shuffleline.py 程序缓冲 . 不是输出,而是输入 . 虽然不是很明显,但是当你遍历文件对象时,Python会读取块,通常是8k字节 . 由于 sys.stdin 是一个文件对象,因此 for 循环将缓冲到EOF或完整块:

    for line in sys.stdin:
        line = line.strip()
        ....
    

    如果你不想这样做,可以使用while循环来调用 sys.stdin.readline() (为EOF返回 '' ):

    while True:
        line = sys.stdin.readline()
        if not line:
            break
        line = line.strip()
        ...
    

    或使用 iter() 的双参数形式,它创建一个迭代器,调用第一个参数,直到返回第二个参数("sentinel"):

    for line in iter(sys.stdin.readline, ''):
        line = line.strip()
        ...
    

    如果我不建议不为此使用线程,而是在子进程的管道上使用非阻塞I / O,或者甚至像 twisted.reactor.spawnProcess 那样有很多方法可以将进程和其他东西连接在一起作为消费者和 生产环境 者,我也会失职 .

相关问题