首页 文章

如何将springxd源mqtt注册到mqtt broker

提问于
浏览
0

我用spring-Xd创建了一些流,例如 -

stream create mqtttestfile --definition "mqtt --url='tcp://localhost:1883' --topics='helloTopic' | file" --deploy

创建并部署了新的流'mqtttestfile' . 我还检查了localhost:9393 / admin-ui,成功创建并部署了流 .

我的MQTT代理正在localhost:1883上运行 . 但是当我检查/ tmp / xd /输出文件目录时,mqtttestfile.out文件丢失了 .

我需要澄清我的假设的以下几点: -

  • 我认为MQTT客户端已经在spring-xd源mqtt模块中配置 . 因此,当我们创建流时,它将自动订阅代理上的特定主题 .

  • 我也试过运行两个单独的python脚本一个用于订阅,另一个用于发布者在单独的终端上,它运行正常 . 所以mqtt经纪人没问题 .

这是我得到的spring-xd控制台的日志:

istener - 在15000毫秒内将部署安排到新容器

2017-03-04T12:07:51 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 路径缓存事件:path = / deployments / modules / allocated / d634d310-12b4-4a83-baea-c1c98dfb7bba / mqtttestfile.sink . file.1,type = CHILD_ADDED

2017-03-04T12:07:51 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 为流'mqtttestfile'部署模块'file'2017-03-04T12:07:52 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache -0 container.DeploymentListener - 部署模块[ModuleDescriptor @ 7e658391 moduleName ='file',moduleLabel ='file',group ='mqtttestfile',sourceChannelName = [null],sinkChannelName = [null],index = 1,type = sink, parameters = map [[empty]],children = list [[empty]]]

2017-03-04T12:07:54 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 路径缓存事件:path = / deployments / modules / allocated / d634d310-12b4-4a83-baea-c1c98dfb7bba / mqtttestfile.source . mqtt.1,type = CHILD_ADDED

2017-03-04T12:07:54 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 为流'mqtttestfile'部署模块'mqtt'2017-03-04T12:07:54 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache -0 container.DeploymentListener - 部署模块[ModuleDescriptor @ 5a7d8e37 moduleName ='mqtt',moduleLabel ='mqtt',group ='mqtttestfile',sourceChannelName = [null],sinkChannelName = [null],index = 0,type = source, parameters = map ['topics' - >'helloTopic','url' - >'tcp:// localhost:1883'],children = list [[empty]]]

2017-03-04T12:07:56 0530 1.3.0.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - 流'mqtttestfile'的部署状态:DeploymentStatus

使用spring-xd 1.3.1仍然没有解决问题,这是我在日志中看到的错误信息 -

2017-03-05T01:15:06 0530 1.3.1.RELEASE INFO LeaderSelector-1 zk.DeploymentSupervisor - 领导因线程中断而被取消

2017-03-05T01:15:06 0530 1.3.1.RELEASE ERROR MQTT Rec:xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter - 丢失连接:连接丢失;重试...

谢谢 .

1 回答

  • 0

    我刚刚测试了你的流,它对我来说很好......

    $ cat /tmp/xd/output/mqtttestfile.out 
    foo
    bar
    

    (在我将消息foo和bar添加到队列之后) .

    org.springframework.integration (在容器的logback.grooy文件中)启用了DEBUG日志记录后,我看到......

    2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 endpoint.EventDrivenConsumer - started outbound.mqtttestfile.0
    2017-03-04T08:02:59-0500 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - Connected and subscribed to [helloTopic]
    2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - started mqttInbound
    2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - Deployment status for stream 'mqtttestfile': DeploymentStatus{state=deployed}
    

    奇怪的是你没有看到 ...started... 消息(INFO) .

    你能试试1.3.1版吗?

相关问题