我正在通过mqtt从我的传感器节点收集遥测数据到我的覆盆子pi . 有两个MQTT客户端连接到本地主机上的代理和我的VPS上的代理 . pi从传感器节点收集数据并将其发送到我的VPS服务器 .

如果pi离线,它将收集本地数据库中的所有值,然后,在重新连接之后,它应该将消息逐个发送到VPS .

通常情况下,消息会逐个发布 . 但是当它断开然后重新连接时,它会创建一个线程“redundancywork()”,并且通过代码它应该逐个发布消息 .

但这就是最终发生的事情:

2018-02-28 00:29:00.304932,4.0,3.0,2.0,1.0,0.0
2018-02-28 00:29:01.074249,4.0,3.0,2.0,1.0,0.0
2018-02-28 00:29:01.496664,4.0,3.0,2.0,1.0,0.0
2018-02-28 00:29:01.912588,4.0,3.0,2.0,1.0,0.0
2018-02-28 00:29:02.264558,4.0,3.0,2.0,1.0,0.0
Message Published
Message Published
Message Published
Message Published
Message Published

它打印出行但仅在循环结束后才发布,即使发布函数在循环内 . 之后,它会逐一发布 . 断开连接时消息不会排队,因为我这样设置 . 这是我的代码:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from model import SN1, Base
from datetime import datetime
import paho.mqtt.client as mqtt
import threading
import time
import os


#Topics to Subscribe to as (Topic Name, QOS Level)
topics = [("SN1",2)]
broker = "localhost"
portno = 1883


#Disconnect CHecker
DISCONNECT = "NO"


##Redundancy Stuff
redundancydb_url = 'sqlite:///redundancy.db'
redunandcy_engine = create_engine(redundancydb_url)
Base.metadata.bind = redunandcy_engine
if os.path.isfile("redundancy.db") is False:
    Base.metadata.create_all(redunandcy_engine)
redunandcysession = sessionmaker(bind = redunandcy_engine)
redundancydb = redunandcysession()


'''
Helper Methods
'''

#ToSubscribe
def on_connect(client, userdata, flags, rc):
    global DISCONNECT
    print("Connected with result code "+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(topics)

    #CHECK IF VPS CLIENT
    if client._client_id == "VPS Client" and DISCONNECT == "YES":
        DISCONNECT = "NO"
        time.sleep(5)
        print("Redundancy Work Begins")
        thread = threading.Thread(target = redundancywork())
        thread.start()

#Message Processing
def on_message(client, userdata, msg):
    decoded = msg.payload.decode("utf-8")
    nowtime = datetime.now()
    message = decoded.split(",")
    print(message[0])
    global DISCONNECT
    global vpsclient


    #MESSAGE SENDING
    if message[0] == "SN1":
        if DISCONNECT == "NO":
            message_payload = "{},{},{},{},{},{}".format(nowtime, message[1], message[2], message[3], message[4], message[5])
            print(message_payload)
            vpsclient.publish("SN1", payload = message_payload, qos = 1)
        if DISCONNECT == "YES":
            newvalues = SN1(nowtime, message[1], message[2], message[3], message[4], message[5])
            redundancydb.add(newvalues)
            redundancydb.commit()
            print("Inserted to Redundancy DB")

def on_publish(client, userdata,mid):
    print("Message Published")

#Disconnection Handler
def on_disconnect(client, userdata, rc):
    print("Client {} has disconnected From Broker".format(client._client_id))
    global DISCONNECT
    DISCONNECT = "YES"

def redundancywork():
    #Get First Value of DB
    global vpsclient

    rdbsession = sessionmaker(bind = redunandcy_engine)
    rdb = rdbsession()
    first = rdb.query(SN1).first()
    print(first)

    #Check if length of firstrow is not 0
    while first is not None:
        message_payload = "{},{},{},{},{},{}".format(first.date, first.airtemp, first.humidity, first.watertemp, first.light, first.ph)
        print(message_payload)
        vpsclient.publish("SN1", payload = message_payload, qos = 1)
        #DELETE ROW
        rdb.delete(first)
        rdb.commit()

        #FETCH ROW
        first = rdb.query(SN1).first()
        time.sleep(2)

'''
Main Function
'''


if __name__ == "__main__":

    #Local Broker Client
    client = mqtt.Client()
    client._client_id = "Local Client"
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.connect(broker, portno, keepalive = 20)

    #Vps Broker Client
    vpsbroker = "*"
    vpsportno = 1883
    global vpsclient
    vpsclient = mqtt.Client()
    vpsclient._client_id = "VPS Client"
    vpsclient.on_connect = on_connect
    vpsclient.on_publish = on_publish
    vpsclient.on_disconnect = on_disconnect
    vpsclient.max_inflight_messages_set(20)
    vpsclient.connect(vpsbroker, vpsportno, keepalive = 20)


while True:
    try:
        client.loop_start()
        vpsclient.loop_start()
    except Exception:
        continue