首页 文章

如何订阅MQTT主题并在Eclipse(Java)上打印收到的消息

提问于
浏览
2

我有一个带恒温器的微控制器,它使用MQTT协议通过Raspberry Pi将数据发送到我的计算机 . Kura已经安装并在Raspberry上工作 .

我在Putty上接收数据没有问题,但现在我需要在Eclipse上接收它,所以我可以开发一个程序 .

我设法使用Paho使用以下代码通过eclipse发布主题(这是对其他主题的改编Subscribe and Read MQTT Message Using PAHO):

package publish;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PublishSemInterface {
MqttClient client;

public PublishSemInterface() {}

public static void main(String[] args) {
    new PublishSemInterface().doDemo();
}

public void doDemo() {
    try {
        client = new MqttClient("tcp://192.168.0.39:1883", "user");
        client.connect();
        MqttMessage message = new MqttMessage();
        message.setPayload("Published message".getBytes());
        client.publish("sensor/temp/out", message);
        client.disconnect();
    } catch (MqttException e) {
        e.printStackTrace();
    }
}
}

但是订阅是一种痛苦 . 我尝试使用上面提到的主题的答案,实现MqttCallback接口:

public class PublishSemInterface implements MqttCallback

连接到客户端和所需的接口方法后添加setCallback(我只需要messageArrived):

client.setCallback(this);

@Override
public void connectionLost(Throwable cause) {}

@Override
public void messageArrived(String topic, MqttMessage message)
    throws Exception {
System.out.println(message);   
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {}

但它没有用 . 我也尝试使用以下主题的答案:How to read data from MQTT in Eclipse Paho?

public static void main(String[] args) {

    MqttClient client;
    MqttConnectOptions conn;

    try {
        client = new MqttClient("tcp://192.168.0.39:1883", "user");
        client.connect();
        client.setCallback(new MqttCallback() {
            public void connectionLost(Throwable cause) {}

            public void messageArrived(String topic,
                    MqttMessage message)
                            throws Exception {
                System.out.println(message.toString());
            }

            public void deliveryComplete(IMqttDeliveryToken token) {}
        });

        client.subscribe("sensor/temp/in");

    } catch (MqttException e) {
        e.printStackTrace();
    }
}

除了它也没有用 . 在这两种情况下,当我运行代码时,控制台处于活动状态,但是当微控制器发送数据(出现在Putty上)而不是打印时,程序终止 . 看起来好像没有调用messageArrived方法 .

任何人都可以帮我在Eclipse的控制台上订阅和打印吗?

2 回答

  • 2

    如您所见: client.publish("sensor/temp/out", message); ,您的主题是 sensor/temp/out . 因此,您的订阅者应该订阅相同的主题,而不是这一行: client.subscribe("sensor/temp/in"); ,尝试订阅主题: sensor/temp/out .

    另外,我建议您使用其他mqtt选项创建连接 . 像这样的东西:

    MqttClient client = new MqttClient(serverUrl, UUID.randomUUID().toString().replace("-", "")); //clientID needs to be unique and has meaning only for mqtt broker
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName("username"); //part of the password_file inside mqtt broker
    options.setPassword("password".toCharArray()); //also part of password_file. Username and password might not be needed.
    options.setConnectionTimeout(60);
    options.setKeepAliveInterval(60); //how often to send PINGREQ messages
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //newest version
    client.connect(options);
    
  • 0

    我设法让发送的数据出现在Eclipse控制台上 . 似乎ClientId是错误的,但我还根据我在问题上链接的主题的答案添加了一些修改 . 这是代码:

    private Map<String, Object> properties;
    
    public void updated(Map<String, Object> properties) {
      this.properties = properties;
      String broker   = "";
      String clientId = "";
      String topic   = "";
    
      if(properties != null && !properties.isEmpty()) {
    
        broker = (String) properties.get("broker.name");
        clientId = (String) properties.get("clientId.name");
        topic = (String) properties.get("topic.name");
    
        doDemo(broker, clientId, topic);
      }
    }
    
    public void doDemo(String broker, String clientId, String topic) {
      MemoryPersistence persistence = new MemoryPersistence();
    
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
    
        sampleClient.setCallback(new MqttCallback() {
          public void connectionLost(Throwable cause) {}
    
          public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("Message: " + message.toString());
          }
    
          public void deliveryComplete(IMqttDeliveryToken token) {}
        });
    
        sampleClient.connect(connOpts);
        sampleClient.subscribe(topic);
    
      } catch(MqttException e) {
        e.printStackTrace();
      }
    }
    

相关问题