首页 文章

在这种情况下如何使用龙卷风协同程序?

提问于
浏览
2

我创建了龙卷风服务器,它接受python和matlab代码并执行它 . 这是服务器代码 .

from zmq.eventloop.zmqstream import ZMQStream
from zmq.eventloop import ioloop
ioloop.install()

from functools import partial
from tornado import web, gen, escape
from tornado import options, httpserver
from tornado.concurrent import Future

settings = dict()
settings['autoreload'] = True 
settings['debug'] = True

from jupyter_client import MultiKernelManager

reply_futures = {}
kids = []

class MainHandler(web.RequestHandler):
    def get(self):
        self.write("Hello")

class ExecuteHandler(web.RequestHandler):
    def get(self):
        self.write("approaching execute")

    def post(self):
        data = escape.json_decode(self.request.body)
        print data
        self.application.execute_code(data)

class Application(web.Application):

    def __init__(self):

        handlers = []
        handlers.append((r"/", MainHandler))
        handlers.append((r"/execute", ExecuteHandler))
        web.Application.__init__(self, handlers, **settings)
        self.km = MultiKernelManager()
        self.setup_kernels()

    def setup_kernels(self):

        matlab_kernel_id = self.km.start_kernel(kernel_name="matlab")
        python_kernel_id = self.km.start_kernel(kernel_name="python")

        self.matlab_kernel_id = matlab_kernel_id
        self.python_kernel_id = python_kernel_id

        matkab_kernel_client = self.km.get_kernel(matlab_kernel_id).client()
        matkab_kernel_client.start_channels()

        python_kernel_client = self.km.get_kernel(python_kernel_id).client()
        python_kernel_client.start_channels()

        self.matkab_kernel_client = matkab_kernel_client
        self.python_kernel_client = python_kernel_client

        matlab_iopub_stream = ZMQStream(matkab_kernel_client.iopub_channel.socket)
        matlab_shell_stream = ZMQStream(matkab_kernel_client.shell_channel.socket)

        python_iopub_stream = ZMQStream(python_kernel_client.iopub_channel.socket)
        python_shell_stream = ZMQStream(python_kernel_client.shell_channel.socket)

        matlab_iopub_stream.on_recv_stream(partial(self.reply_callback, matkab_kernel_client.session))
        matlab_shell_stream.on_recv_stream(partial(self.reply_callback, matkab_kernel_client.session))

        python_iopub_stream.on_recv_stream(partial(self.reply_callback, python_kernel_client.session))
        python_shell_stream.on_recv_stream(partial(self.reply_callback, python_kernel_client.session))

    def reply_callback(self, session, stream, msg_list):
        idents, msg_parts = session.feed_identities(msg_list)
        reply = session.deserialize(msg_parts)

        if "stream" == reply["msg_type"]:
            print reply["content"]["text"]
        parent_id = reply['parent_header'].get('msg_id')
        reply_future = reply_futures.get(parent_id)
        if reply_future:
            reply_future.set_result(reply)

    def execute_code(self, data):

        matlab_code = data['matlab_code']
        python_code = data['python_code']

        self.execute_matlab_then_python(matlab_code, python_code)

    @gen.coroutine
    def execute_matlab_then_python(self, matlab_code, python_code):

        print "executing matlab code"
        parent_id1 = self.matkab_kernel_client.execute(matlab_code)
        f1 = reply_futures[parent_id1] = Future()
        yield f1    

        print "executing python code"
        parent_id2 = self.python_kernel_client.execute(python_code)
        f2 = reply_futures[parent_id2] = Future()
        yield f2

    def shutdown_kernels(self):
        self.km.get_kernel(self.matlab_kernel_id).cleanup_connection_file()
        self.km.shutdown_kernel(self.matlab_kernel_id)

        self.km.get_kernel(self.python_kernel_id).cleanup_connection_file()
        self.km.shutdown_kernel(self.python_kernel_id)

if __name__ == '__main__':

    options.parse_command_line()
    app = Application()
    server = httpserver.HTTPServer(app)
    server.listen(8888)

    try:
        ioloop.IOLoop.current().start()
    except KeyboardInterrupt:
        print 'going down'
    finally:    
        app.shutdown_kernels()

我用来访问的客户端代码就在这里

import json
import requests

matlab_code = "a=magic(3);disp(a)"
python_code = "print 'Hello World!!'"

data = {}
data['matlab_code'] = matlab_code
data['python_code'] = python_code

r = requests.post('http://0.0.0.0:8888/execute', json.dumps(data))

我关心的是维持执行的顺序,这样只有在matlab完成后才能执行python代码 . 我正在使用jupyter_client来执行matlab / python代码 . 我在这里使用python27 . 问题是,当我提交代码时,它会抛出 TypeError: 'NoneType' object is not iterable . 这是一个堆栈跟踪它 .

[E 160809 15:17:51 ioloop:633] Exception in callback None
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 887, in start
        handler_func(fd_obj, events)
      File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
        return fn(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
        self._handle_recv()
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
        self._run_callback(callback, msg)
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
        callback(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
        return fn(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 191, in 
        self.on_recv(lambda msg: callback(self, msg), copy=copy)
      File "tornado_test.py", line 86, in reply_callback
        reply_future.set_result(reply)
      File "/usr/local/lib/python2.7/dist-packages/tornado/concurrent.py", line 276, in set_result
        self._set_done()
      File "/usr/local/lib/python2.7/dist-packages/tornado/concurrent.py", line 320, in _set_done
        for cb in self._callbacks:
    TypeError: 'NoneType' object is not iterable

我不明白这里有什么问题?

1 回答

  • 2

    不幸的是,神秘的错误消息"'NoneType' object is not iterable"表示您在同一个 Future 对象上多次调用 set_result . 我不太了解zmq或jupyter内核接口,但我的猜测是两个不同内核的 execute 方法返回的ID重叠,或者每个执行调用得到多个响应(来自两个不同的流?) .

相关问题