我有一个使用pika和rabbitmq在python中编写的简单消费者 .

消费者连接到rabbitmq并侦听队列 . 当消息到达时,它会转换消息并将其发布到另一个队列中 .

在这里展示:https://bitbucket.org/snippets/fbanke/8e7zbX

我想制作测试用例来测试使用者和队列之间的交互 . 例如,我想确保在消息消费时调用“basic_ack”函数让rabbitmq知道消息已被处理 .

另一个测试用例是,如果连接断开,消费者会重新连接到rabbitmq .

等等 . 我想测试使用者和队列之间的交互,而不是消费者中的实际业务逻辑 .

如果我嘲笑pika对象,它需要我100%理解API的行为方式,任何对API的误解都会导致错误的代码 . 通过测试的代码,但实际上不起作用 .

我宁愿使用实时队列测试消费者,并从测试中操作它以查看消费者是否按预期行事 .

例如1.设置队列2.启动使用者3.将有效消息发布到队列4.断言消息已由工作者使用

要么

  • 设置队列

  • 启动消费者

  • 杀死队列

  • 断言 Worker 按预期终止

有没有关于如何做到这一点的最佳实践?我可以在数据库中找到许多类似测试的例子,但不能找到队列 . 似乎我需要在一个单独的线程中启动使用者并使用它,但似乎没有基础设施来支持它 .