首页 文章

重复写入stdin并从python中读取进程的stdout

提问于
浏览
8

我有一段fortran代码从STDIN读取一些数字并将结果写入STDOUT . 例如:

do
  read (*,*) x
  y = x*x
  write (*,*) y
enddo

所以我可以从shell启动程序并获得以下输入序列/ outputs

5
25.0
2.5
6.25

现在我需要在python中执行此操作 . 在与subprocess.Popen徒手摔跤并查看本网站上的旧问题之后,我决定使用pexpect.spawn:

import pexpect, os
p = pexpect.spawn('squarer')
p.setecho(False)
p.write("2.5" + os.linesep)
res = p.readline()

它的工作原理 . 问题是,我需要在python和我的fortran程序之间传递 real 数据是一个100,000(或更多)双精度浮点数组 . 如果它们包含在名为 x 的数组中,那么

p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

从pexpect发出以下错误消息超时:

buffer (last 100 chars):   
before (last 100 chars):   
after: <class 'pexpect.TIMEOUT'>  
match: None  
match_index: None  
exitstatus: None
flag_eof: False
pid: 8574
child_fd: 3
closed: False
timeout: 30
delimiter: <class 'pexpect.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1

除非 x 少于303个元素 . 有没有办法将大量数据传入/传出另一个程序的STDIN / STDOUT?

我试过将数据分成更小的块,但后来我失去了 lot 的速度 .

提前致谢 .

5 回答

  • 0

    使用子进程模块找到了一个解决方案,所以我在这里发布它以供参考,如果有人需要做同样的事情 .

    import subprocess as sbp
    
    class ExternalProg:
    
        def __init__(self, arg_list):
            self.opt = sbp.Popen(arg_list, stdin=sbp.PIPE, stdout=sbp.PIPE, shell=True, close_fds=True)
    
        def toString(self,x):
            return ' '.join(["%.12f"%k for k in x])
    
        def toFloat(self,x):
            return float64(x.strip().split())
    
        def sendString(self,string):
            if not string.endswith('\n'):
                string = string + '\n'
            self.opt.stdin.write(string)
    
        def sendArray(self,x):
            self.opt.stdin.write(self.toString(x)+'\n')
    
        def readInt(self):
            return int(self.opt.stdout.readline().strip())
    
        def sendScalar(self,x):
            if type(x) == int:
                self.opt.stdin.write("%i\n"%x)
            elif type(x) == float:
                self.opt.stdin.write("%.12f\n"%x)
    
        def readArray(self):
            return self.toFloat(self.opt.stdout.readline())
    
        def close(self):
            self.opt.kill()
    

    使用名为“optimizer”的外部程序调用该类,如下所示:

    optim = ExternalProg(['./optimizer'])
    optim.sendScalar(500) # send the optimizer the length of the state vector, for example
    optim.sendArray(init_x) # the initial guess for x
    optim.sendArray(init_g) # the initial gradient g
    next_x = optim.readArray() # get the next estimate of x
    next_g = evaluateGradient(next_x) # calculate gradient at next_x from within python
    # repeat until convergence
    

    在fortran方面(程序编译为给出可执行文件'optimizer'),将读入一个500元素的向量:

    read(*,*) input_vector(1:500)
    

    并将被写出来:

    write(*,'(500f18.11)') output_vector(1:500)
    

    就是这样!我用状态向量测试了多达200,000个元素(这是我现在需要的上限) . 希望这能帮助除我以外的其他人 . 这个解决方案适用于ifort和xlf90,但由于某种原因我不理解gfortran .

  • 0

    示例squarer.py程序(它恰好在Python中,使用您的Fortran可执行文件):

    #!/usr/bin/python
    import sys
    data= sys.stdin.readline() # expecting lots of data in one line
    processed_data= data[-2::-1] # reverse without the newline
    sys.stdout.write(processed_data+'\n')
    

    示例target.py程序:

    import thread, Queue
    import subprocess as sbp
    
    class Companion(object):
        "A companion process manager"
        def __init__(self, cmdline):
            "Start the companion process"
            self.companion= sbp.Popen(
                cmdline, shell=False,
                stdin=sbp.PIPE,
                stdout=sbp.PIPE)
            self.putque= Queue.Queue()
            self.getque= Queue.Queue()
            thread.start_new_thread(self._sender, (self.putque,))
            thread.start_new_thread(self._receiver, (self.getque,))
    
        def _sender(self, que):
            "Actually sends the data to the companion process"
            while 1:
                datum= que.get()
                if datum is Ellipsis:
                    break
                self.companion.stdin.write(datum)
                if not datum.endswith('\n'):
                    self.companion.stdin.write('\n')
    
        def _receiver(self, que):
            "Actually receives data from the companion process"
            while 1:
                datum= self.companion.stdout.readline()
                que.put(datum)
    
        def close(self):
            self.putque.put(Ellipsis)
    
        def send(self, data):
            "Schedule a long line to be sent to the companion process"
            self.putque.put(data)
    
        def recv(self):
            "Get a long line of output from the companion process"
            return self.getque.get()
    
    def main():
        my_data= '12345678 ' * 5000
        my_companion= Companion(("/usr/bin/python", "squarer.py"))
    
        my_companion.send(my_data)
        my_answer= my_companion.recv()
        print my_answer[:20] # don't print the long stuff
        # rinse, repeat
    
        my_companion.close()
    
    if __name__ == "__main__":
        main()
    

    main 函数包含您将使用的代码:设置 Companion 对象, companion.send 一长行数据, companion.recv 一行 . 根据需要重复 .

  • 1

    我认为你只在这里添加一个换行符:

    p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)
    

    而不是每行添加一个 .

  • 5

    看起来你超时(默认超时,我相信,30秒),因为准备,发送,接收和处理大量数据需要花费大量时间 . 对于the docstimeout=expect 方法的可选命名参数,您可以在初始化程序中设置默认超时,这可以通过对源进行粗略查找(或者,最坏的情况,通过黑客攻击来创建) ) .

    如果Fortran程序一次读取并保存(比方说)100个项目,并且有提示,则同步将变得非常容易 . 您可以为此目的修改Fortran代码,还是宁愿选择无证/黑客方法?

  • 0

    这是一个巨大的简化:将Python分解为两件事 .

    python source.py | squarer | python sink.py
    

    squarer 应用程序是您的Fortran代码 . 从stdin读取,写入stdout .

    你的 source.py 就是你的Python

    import sys
    sys.stdout.write(' '.join(["%.10f"%k for k in x]) + os.linesep)
    

    或者,也许更简单一些,即

    from __future__ import print_function
    print( ' '.join(["{0:.10f}".format(k) for k in x]) )
    

    你的 sink.py 是这样的 .

    import fileinput
    for line in fileinput.input():
        # process the line
    

    分离源,平方和接收器可以获得3个独立的进程(而不是2个),并将使用更多内核 . 更多核心==更多并发==更有趣 .

相关问题