我正在使用Spring Cloud Stream,它使用Spring Boot进行配置,利用MessageChannel我试图将一些消息发送到正在发送和接收的通道 . 我还为该 Channels 指定了一个目的地,这是一个Kafka主题(我已经在application.properties文件中设置了args / binder属性),不知怎的,我仍然无法在我的消费者控制台上收到任何消息,显然在日志中似乎没有任何可疑的迹象 . (已经创建了kafka主题)

这在我的日志中显示:

2017-06-26 17:41:08.305 DEBUG 8520 --- [           main] o.s.integration.channel.DirectChannel    : preSend on channel 'output', message: GenericMessage [payload={ "name": "ABC", "emailAddress":"abc@gmail.com","password":"xxx"}, headers={kafka_topic=new-topic, id=8dbc3fba-c30c-ea43-e269-7b5a24d6d8e7, timestamp=1498479068305}]
2017-06-26 17:41:08.305 DEBUG 8520 --- [           main] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'output', message: GenericMessage [payload={ "name": "ABC", "emailAddress":"abc@gmail.com","password":"xxx"}, headers={kafka_topic=new-topic, id=8dbc3fba-c30c-ea43-e269-7b5a24d6d8e7, timestamp=1498479068305}]
2017-06-26 17:41:08.305  INFO 8520 --- [           main] c.f.e.c.stream.MyApplication: ****************In MyApplication, sent = :true

application.properties

spring.cloud.stream.bindings.output.destination=new-topic
spring.cloud.stream.bindings.input.destination=new-topic
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

MyApplication.java

public class MyApplication {

final static Logger logger = Logger.getLogger(MyApplication.class);

@Autowired
private Source source;

public void sendMessage(String json) {
logger.info("In MyApplication, sent = :"+ source.output().send(MessageBuilder.withPayload(json).setHeader(KafkaHeaders.TOPIC, "new-topic").build()));
  }
}

P.S . :如果我使用Kafka 生产环境 者控制台发送消费者使用的消息,我也可以使用Kafka Producer以编程方式尝试 .

我真的很感激,如果有人能帮助我知道什么不能使它工作或者上述设置中缺少任何东西 . 提前谢谢了!