版本:

$ python --version
Python 3.5.2

$ ll /somepath/python3.5/site-packages/ | grep zmq
drwxrwxr-x   2 xxx xxx   4096 Mar 11  2017 pyzmq-16.0.2.dist-info
drwxrwxr-x  15 xxx xxx   4096 Mar 11  2017 zmq

我正在尝试包装 pyzmq 以便我可以方便地发送带有 Headers 的python对象并按 Headers 过滤 . 在我看来, pyzmq 可以发送python对象,但其过滤仅作为字节前缀 . 问题是,当我测试我的发布/订阅对时,订阅者有时会从一开始就错过一些消息 . 不确定这是怎么发生的 .

在以下代码中,我有两个发布者和一个订阅者 . 每个发布者发送一个 Headers (一个字符串以标识来自另一个的发布者)和数字0~99 . 用户期望从双方接收0~99并且如果发生则正常终止 . 我想我确保在让发布者发送号码之前初始化订阅者 . 我已经(单独)测试了订户可以初始化并连接到终点,甚至在发布者在线之前接收来自发布者的消息(当它联机时) . 当用户从双方都没有收到0~99时,测试失败 . 在大多数情况下,测试成功运行 . 但在某些情况下(<10%),订阅者不会从其中一个发布者那里收到0作为第一个号码 . 从日志中我看到,在这种情况下,违规发布者通常会开始在另一个发布者之前以及在订阅者开始报告收到的号码之前发送一长串消息 . 我认为这可能与消息队列长度有关,所以我明确地将套接字缓冲区长度设置为某个值,我认为它足够大,但有时仍然会失败 .

主包装 zmq.py

import zmq
import json

from .logger import Logger


class ZmqPublisher:
    def __init__(self, port: int, ip: str = None, buffer_size: int = None, logger: Logger = None):
        if ip is None:
            ip = '*'

        context = zmq.Context()
        self.socket = context.socket(zmq.PUB)

        address = "tcp://%s:%s" % (ip, port)
        if logger is not None:
            logger.info('Publishing to: %s', address)
        self.socket.bind(address)

        if buffer_size is not None:
            self.socket.setsockopt(zmq.SNDBUF, buffer_size)

    def send(self, title: str, obj, logger: Logger = None):
        if logger is not None:
            logger.debug('Sending: %s %s', title, obj)

        bytes = json.dumps([title, obj]).encode('utf-8')
        if logger is not None:
            logger.debug('Sending bytes: %s', bytes)
        return self.socket.send(bytes)


class ZmqSubscriber:
    def __init__(self, ports, ip: str = None, filters: list = None, buffer_size: int = None, logger: Logger = None):
        if type(ports) != list:
            ports = [ports]

        if ip is None:
            ip = 'localhost'

        if filters is None:
            filters = ['']

        context = zmq.Context()
        self.socket = context.socket(zmq.SUB)

        for p in ports:
            address = 'tcp://%s:%s' % (ip, p)
            if logger is not None:
                logger.info('Subscribing to: %s', address)
            self.socket.connect(address)

        for f in filters:
            bdata = json.dumps([f])
            if logger is not None:
                logger.info('Filtering by: %s', f)
                logger.debug('FILTER %s', bdata[:-2].encode('utf-8'))
            self.socket.setsockopt(zmq.SUBSCRIBE, bdata[:-2].encode('utf-8'))
        # self.socket.setsockopt(zmq.SUBSCRIBE, b'')

        if buffer_size is not None:
            self.socket.setsockopt(zmq.RCVBUF, buffer_size)

    def recv(self, logger: Logger = None):
        bytes = self.socket.recv()
        [title, obj] = json.loads(bytes.decode('utf-8'))

        if logger is not None:
            logger.debug('Receiving: %s %s', title, obj)

        return title, obj

测试文件 test_zmq.py

import time
import pytest
import random
import threading
import sys

from util.zmq import *
from util.logger import L, V, D
from util.guice import Guice
from util.mockable import get_now

logger = D

Guice().provide(logger=logger).inject(ZmqPublisher, ZmqSubscriber)


def test_zmq():
    start = threading.Event()
    err_a = threading.Event()
    err_b = threading.Event()
    err_c = threading.Event()

    def publisher(title, port, err):
        try:
            pub = ZmqPublisher(port, buffer_size=100000)
            start.wait()

            for i in range(100):
                pub.send(title, i)
        except Exception as e:
            logger.critical('%s was raised during test from publisher %s: %s', type(e).__name__, title, e)
            err.set()
        finally:
            logger.info('Publisher %s terminating', title)

    def subscriber(ports, err):
        try:
            sub = ZmqSubscriber(ports, buffer_size=100000)
            start.set()

            expected_a = 0
            expected_b = 0
            while expected_a != 100 or expected_b != 100:
                title, data = sub.recv()

                if title == 'A':
                    assert data == expected_a
                    expected_a = data + 1
                elif title == 'B':
                    assert data == expected_b
                    expected_b = data + 1
                else:
                    raise RuntimeError('Unexpected message title received: %s' % title)
        except Exception as e:
            logger.critical('%s was raised during test from subscriber: %s', type(e).__name__, e)
            err.set()
        finally:
            logger.info('Subscriber terminating')

    ta = threading.Thread(target=publisher, args=('A', 5556, err_a))
    tb = threading.Thread(target=publisher, args=('B', 5557, err_b))
    tc = threading.Thread(target=subscriber, args=([5556, 5557], err_c))

    ta.start()
    tb.start()
    tc.start()

    now = get_now().timestamp()
    while (ta.isAlive() or tb.isAlive() or tc.isAlive()) and get_now().timestamp() - now < 1:
        time.sleep(0.01)

    # The three threads should finish within 0.1 second
    assert not ta.isAlive()
    assert not tb.isAlive()
    assert not tc.isAlive()
    assert not err_a.is_set()
    assert not err_b.is_set()
    assert not err_c.is_set()

一个失败的例子:

$ pytest tests/test_zmq.py -s
=============================================================================================== test session starts ===============================================================================================
platform linux -- Python 3.5.2, pytest-3.1.3, py-1.4.34, pluggy-0.4.0 -- /home/xxx/xxx/bin/python3
cachedir: .cache
rootdir: /home/xxx/util, inifile:
collected 1 item s

tests/test_zmq.py::test_zmq 00:00:00,921  I  Publishing to: tcp://*:5556
00:00:00,922  I  Publishing to: tcp://*:5557
00:00:00,923  I  Subscribing to: tcp://localhost:5556
00:00:00,923  I  Subscribing to: tcp://localhost:5557
00:00:00,923  I  Filtering by:
00:00:00,923  D  FILTER b'["'
00:00:00,924  D  Sending: A 0
00:00:00,924  D  Sending bytes: b'["A", 0]'
00:00:00,924  D  Sending: B 0
00:00:00,924  D  Sending bytes: b'["B", 0]'
00:00:00,924  D  Sending: B 1
00:00:00,924  D  Sending bytes: b'["B", 1]'
00:00:00,924  D  Sending: B 2
00:00:00,925  D  Sending: A 1
00:00:00,925  D  Sending bytes: b'["B", 2]'
00:00:00,925  D  Sending bytes: b'["A", 1]'
00:00:00,925  D  Sending: B 3
00:00:00,925  D  Sending: A 2
00:00:00,925  D  Receiving: A 1
00:00:00,925  D  Sending bytes: b'["B", 3]'
00:00:00,926  D  Sending: B 4
00:00:00,926  D  Sending bytes: b'["B", 4]'
00:00:00,926  D  Sending: B 5
00:00:00,926  D  Sending bytes: b'["B", 5]'
00:00:00,927  D  Sending: B 6
00:00:00,926  D  Sending bytes: b'["A", 2]'
00:00:00,927  D  Sending bytes: b'["B", 6]'
00:00:00,927  D  Sending: B 7
00:00:00,927  D  Sending bytes: b'["B", 7]'
00:00:00,927  D  Sending: B 8
00:00:00,927  D  Sending bytes: b'["B", 8]'
00:00:00,928  D  Sending: B 9
00:00:00,928  D  Sending bytes: b'["B", 9]'
00:00:00,928  D  Sending: B 10
00:00:00,928  D  Sending bytes: b'["B", 10]'
00:00:00,928  D  Sending: B 11
00:00:00,928  D  Sending bytes: b'["B", 11]'
00:00:00,928  D  Sending: B 12
00:00:00,929  D  Sending bytes: b'["B", 12]'
00:00:00,929  D  Sending: B 13
00:00:00,929  D  Sending bytes: b'["B", 13]'
00:00:00,929  D  Sending: B 14
00:00:00,929  D  Sending bytes: b'["B", 14]'
00:00:00,929  D  Sending: B 15
00:00:00,929  D  Sending bytes: b'["B", 15]'
00:00:00,929  D  Sending: B 16
00:00:00,930  D  Sending bytes: b'["B", 16]'
00:00:00,930  D  Sending: B 17
00:00:00,930  D  Sending bytes: b'["B", 17]'
00:00:00,930  D  Sending: B 18
00:00:00,930  D  Sending bytes: b'["B", 18]'
00:00:00,930  D  Sending: B 19
00:00:00,930  D  Sending bytes: b'["B", 19]'
00:00:00,931  D  Sending: B 20
00:00:00,931  D  Sending bytes: b'["B", 20]'
00:00:00,931  D  Sending: B 21
00:00:00,931  D  Sending bytes: b'["B", 21]'
00:00:00,931  D  Sending: B 22
00:00:00,931  D  Sending bytes: b'["B", 22]'
00:00:00,932  D  Sending: B 23
00:00:00,932  D  Sending bytes: b'["B", 23]'
00:00:00,927  D  Sending: A 3
00:00:00,932  D  Sending: B 24
00:00:00,932  D  Sending bytes: b'["A", 3]'
00:00:00,932  D  Sending bytes: b'["B", 24]'
00:00:00,933  D  Sending: A 4
00:00:00,933  D  Sending: B 25
00:00:00,933  D  Sending bytes: b'["A", 4]'
00:00:00,933  D  Sending bytes: b'["B", 25]'
00:00:00,933  D  Sending: A 5
00:00:00,934  D  Sending: B 26
00:00:00,934  D  Sending bytes: b'["A", 5]'
00:00:00,934  D  Sending bytes: b'["B", 26]'
00:00:00,934  D  Sending: A 6
00:00:00,935  D  Sending: B 27
00:00:00,935  D  Sending bytes: b'["A", 6]'
00:00:00,935  D  Sending bytes: b'["B", 27]'
00:00:00,935  D  Sending: A 7
00:00:00,935  D  Sending: B 28
00:00:00,936  D  Sending bytes: b'["A", 7]'
00:00:00,936  D  Sending bytes: b'["B", 28]'
00:00:00,936  D  Sending: A 8
00:00:00,936  D  Sending: B 29
00:00:00,936  D  Sending bytes: b'["A", 8]'
00:00:00,937  D  Sending bytes: b'["B", 29]'
00:00:00,937  D  Sending: A 9
00:00:00,937  C  AssertionError was raised during test from subscriber: assert 1 == 0
...
...
00:00:00,979  D  Sending: A 96
00:00:00,979  D  Sending bytes: b'["A", 96]'
00:00:00,979  D  Sending: A 97
00:00:00,979  D  Sending bytes: b'["A", 97]'
00:00:00,979  D  Sending: A 98
00:00:00,979  D  Sending bytes: b'["A", 98]'
00:00:00,979  D  Sending: A 99
00:00:00,980  D  Sending bytes: b'["A", 99]'
00:00:00,980  I  Publisher A terminating
FAILED

============================================================================================= slowest test durations ==============================================================================================
0.06s call     tests/test_zmq.py::test_zmq
0.00s setup    tests/test_zmq.py::test_zmq
0.00s teardown tests/test_zmq.py::test_zmq
==================================================================================================== FAILURES =====================================================================================================
____________________________________________________________________________________________________ test_zmq _____________________________________________________________________________________________________

    def test_zmq():
        start = threading.Event()
        err_a = threading.Event()
        err_b = threading.Event()
        err_c = threading.Event()

        def publisher(title, port, err):
            try:
                pub = ZmqPublisher(port, buffer_size=100000)
                start.wait()

                for i in range(100):
                    pub.send(title, i)
            except Exception as e:
                logger.critical('%s was raised during test from publisher %s: %s', type(e).__name__, title, e)
                err.set()
            finally:
                logger.info('Publisher %s terminating', title)

        def subscriber(ports, err):
            try:
                sub = ZmqSubscriber(ports, buffer_size=100000)
                start.set()

                expected_a = 0
                expected_b = 0
                while expected_a != 100 or expected_b != 100:
                    title, data = sub.recv()

                    if title == 'A':
                        assert data == expected_a
                        expected_a = data + 1
                    elif title == 'B':
                        assert data == expected_b
                        expected_b = data + 1
                    else:
                        raise RuntimeError('Unexpected message title received: %s' % title)
            except Exception as e:
                logger.critical('%s was raised during test from subscriber: %s', type(e).__name__, e)
                err.set()
            finally:
                logger.info('Subscriber terminating')

        ta = threading.Thread(target=publisher, args=('A', 5556, err_a))
        tb = threading.Thread(target=publisher, args=('B', 5557, err_b))
        tc = threading.Thread(target=subscriber, args=([5556, 5557], err_c))

        ta.start()
        tb.start()
        tc.start()

        now = get_now().timestamp()
        while (ta.isAlive() or tb.isAlive() or tc.isAlive()) and get_now().timestamp() - now < 1:
            time.sleep(0.01)

        # The three threads should finish within 0.1 second
        assert not ta.isAlive()
        assert not tb.isAlive()
        assert not tc.isAlive()
        assert not err_a.is_set()
        assert not err_b.is_set()
>       assert not err_c.is_set()
E       assert not True
E        +  where True = <bound method Event.is_set of <threading.Event object at 0x7f49de7c8080>>()
E        +    where <bound method Event.is_set of <threading.Event object at 0x7f49de7c8080>> = <threading.Event object at 0x7f49de7c8080>.is_set

tests/test_zmq.py:79: AssertionError
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
============================================================================================ 1 failed in 0.58 seconds =============================================================================================