首页 文章

Spring Cloud 流MessageChannel send()始终返回true

提问于
浏览
0

我正在使用Spring Cloud 流,我想保存消息并重试在Kafka服务器消失时在主题上发布它们但MessageChannel send()方法总是返回true,即使Kafka / Zookeeper服务器已停止 .

有人可以帮忙吗?

UPDATE with application.yml content :

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                    mode: raw
                bindings:
                    output:
                        producer:
                            sync: true
            bindings:
                output:
                    destination: topic-notification
                    content-type: application/json

CODE :

@Service
public class SendToKafka {
    private Logger log = LoggerFactory.getLogger(SendToKafka.class);

    @Autowired
    Source source;

    @Autowired
    NotificationFileService notificationFileService;

    public void send(NotificationToResendDTO notification){
        try {
            CompletableFuture.supplyAsync(() -> notification)
                .thenAcceptAsync(notif -> {
                    boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
                    log.info(" ======== kafka server response === " + resp);

                    if (!resp){
                        log.info(" ======== failed to send the notification" + notification);
                        // save failed notification
                        notificationFileService.writeTofile(notification);
                    }
                }).get();
        } catch (InterruptedException | ExecutionException e) {
            log.info(" ======== failed to send the notification with exception" + notification);
            // save failed notification
            notificationFileService.writeTofile(notification);
            e.printStackTrace();
        }
    }
}

1 回答

  • 0

    Kafka默认是异步的;你需要将 sync 设置为 true ;见binder producer properties .

    sync 生产环境 者是否同步 . 默认值:false .

相关问题