首页 文章

同时读取子进程stdout和stderr

提问于
浏览
2

我试图在Python中运行一个冗长的命令,输出到stdout和stderr . 我想轮询子进程并将输出写入单独的文件 .

基于这个答案,我尝试了以下内容Non-blocking read on a subprocess.PIPE in python

import subprocess

from Queue import Queue, Empty
from threading import Thread

def send_cmd(cmd, shell=False):
    """
    Send cmd to the shell
    """
    if not isinstance(cmd, list): cmd = shlex.split(cmd)

    params = {'args'   : cmd,
              'stdout' : subprocess.PIPE,
              'stderr' : subprocess.PIPE,
              'shell'  : shell}

    proc = subprocess.Popen(**params)

    return proc

def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull):
    """
    Monitor the process that is running, and log it if desired
    """
    def enqueue_output(out, queue):
        for line in iter(out.readline, b''):
            queue.put(line)

    def setup_process(log_name, proc):
        FID = open(log_name, 'w')
        queue = Queue()
        thread = Thread(target=enqueue_output, args=(proc, queue))
        thread.daemon = True # Thread dies with program
        thread.start()

        return (queue, FID)

    def check_queues(queue_list, errors):
        for queue, FID in queue_list:
            try:
                line = queue.get_nowait()
                if 'error' in line.lower() or 'failed' in line.lower():
                    errors.append(line)
            except Empty:
                pass
            else:
                FID.write(line)

    errors = []
    queue_list = []

    for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]:
        queue_list.append(setup_process(log, proc)

    while process.poll() is None:
        check_queues(queue_list, errors)

    while not queue_list[0][0].empty() or queue_list[1][0].empty():
        check_queues(queue_list, errors)

    for queue, FID in queue_list:
        FID.close()

return errors

process = send_cmd('long_program.exe')
errors  = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log')

但是stdout的输出文件是空的,stderr的输出文件只有几行,而两者都应该很大 .

我错过了什么?

2 回答

  • 0

    我做过一次..这是我写的一些旧代码

    class Process_Communicator():
    
        def join(self):
            self.te.join()
            self.to.join()
            self.running = False
            self.aggregator.join()
            self.ti.join()
    
        def enqueue_in(self):
            while self.running and self.p.stdin is not None:
                while not self.stdin_queue.empty():
                    s = self.stdin_queue.get()
                    self.p.stdin.write(str(s) + '\n\r')
                pass
    
        def enqueue_output(self):
            if not self.p.stdout or self.p.stdout.closed:
                return
            out = self.p.stdout
            for line in iter(out.readline, b''):
                self.qo.put(line)
            #    out.flush()
    
        def enqueue_err(self):
            if not self.p.stderr or self.p.stderr.closed:
                return
            err = self.p.stderr
            for line in iter(err.readline, b''):
                self.qe.put(line)
    
        def aggregate(self):
            while (self.running):
                self.update()
            self.update()
    
        def update(self):
            line = ""
            try:
                while self.qe.not_empty:
                    line = self.qe.get_nowait()  # or q.get(timeout=.1)
                    self.unbblocked_err += line
            except Queue.Empty:
                pass
    
            line = ""
            try:
                while self.qo.not_empty:
                    line = self.qo.get_nowait()  # or q.get(timeout=.1)
                    self.unbblocked_out += line
            except Queue.Empty:
                pass
    
            while not self.stdin_queue.empty():
                    s = self.stdin_queue.get()
                    self.p.stdin.write(str(s))
    
        def get_stdout(self, clear=True):
            ret = self.unbblocked_out
            if clear:
                self.unbblocked_out = ""
            return ret
    
        def has_stdout(self):
            ret = self.get_stdout(False)
            if ret == '':
                return None
            else:
                return ret
    
        def get_stderr(self, clear=True):
            ret = self.unbblocked_out
            if clear:
                self.unbblocked_out = ""
            return ret
    
        def has_stderr(self):
            ret = self.get_stdout(False)
            if ret == '':
                return None
            else:
                return ret
    
        def __init__(self, subp):
            '''This is a simple class that collects and aggregates the
            output from a subprocess so that you can more reliably use
            the class without having to block for subprocess.communicate.'''
            self.p = subp
            self.unbblocked_out = ""
            self.unbblocked_err = ""
            self.running = True
            self.qo = Queue.Queue()
            self.to = threading.Thread(name="out_read",
                                        target=self.enqueue_output,
                                        args=())
            self.to.daemon = True  # thread dies with the program
            self.to.start()
    
            self.qe = Queue.Queue()
            self.te = threading.Thread(name="err_read",
                                       target=self.enqueue_err,
                                       args=())
            self.te.daemon = True  # thread dies with the program
            self.te.start()
    
            self.stdin_queue = Queue.Queue()
            self.aggregator = threading.Thread(name="aggregate",
                                               target=self.aggregate,
                                               args=())
            self.aggregator.daemon = True  # thread dies with the program
            self.aggregator.start()
            pass
    

    您可能不需要整个示例,但可以随意剪切复制并粘贴您需要的内容 . 展示我如何进行线程化也很重要 .

  • 0

    代码看起来比任务需要的更复杂 . 我不明白为什么你需要在这里调用 process.poll()queue.get_nowait() . 将子进程'stdout / stderr传递给多个接收器;你可以从teed_call() that accepts arbitrary file-like objects开始:你可以传递日志文件和特殊的文件类对象,它们在 .write() 方法中累积 errors .

    以最小的更改来修复代码;你应该在阅读器线程上调用 .join() (即使 process.poll() 不是 None ,即子进程退出;可能有一些待处理的输出 . 加入阅读器的线程确保读取所有输出) .

相关问题