有没有人有关于测试Flask Content Streaming资源的经验/指针?我的应用程序使用Redis Pub / Sub,当在通道中接收消息时,它会流式传输文本'data: '实现在Flask docs:docs之后,我的单元测试也在Flask docs之后完成 . 发往Redis pub / sub的消息由第二个资源(POST)发送 .

我正在创建一个线程来监听流,同时我在主应用程序上发布到发布到Redis的资源 .

虽然连接断言通过(我收到OK 200和mimetype'text / event-stream'),但数据对象是空的 .

我的单元测试是这样的:

def test_04_receiving_events(self):
    headers = [('Content-Type', 'application/json')]
    data = json.dumps({"value": 42})
    headers.append(('Content-Length', len(data)))

    def get_stream():
        rv_stream = self.app.get('stream/data')
        rv_stream_object = json.loads(rv_stream.data) #error (empty)
        self.assertEqual(rv_stream.status_code, 200)
        self.assertEqual(rv_stream.mimetype, 'text/event-stream')
        self.assertEqual(rv_stream_object, "data: {'value': 42}")
        t.stop()

    threads = []
    t = Thread(target=get_stream)
    threads.append(t)
    t.start()
    time.sleep(1)
    rv_post = self.app.post('/sensor', headers=headers, data=data)

    threads_done = False
    while not threads_done:
        threads_done = True
        for t in threads:
            if t.is_alive():
                threads_done = False
                time.sleep(1)

应用资源是:

@app.route('/stream/data')
def stream():
    def generate():
        pubsub = db.pubsub()
        pubsub.subscribe('interesting')

        for event in pubsub.listen():

            if event['type'] == 'message':
                yield 'data: {"value":%s}\n\n' % event['data']
    return Response(stream_with_context(generate()),
                direct_passthrough=True,
                mimetype='text/event-stream')

有关如何在Flask中测试内容流的任何指针或示例?除非我在搜索错误的关键字,否则谷歌似乎对此没什么帮助 .

提前致谢 .