首页 文章

Spring Cloud Stream IntegrationFlow with Rabbitmq messaging,消费者将ASCII数字作为消息有效负载

提问于
浏览
0

我正在使用spring cloud stream进行消息传递 . 在消费者部分,我使用IntegrationFlow来侦听队列 . 它正在监听并打印来自制作人的信息 . 但格式不同,这就是我现在面临的问题 . 生产环境 者的内容类型是application / json和显示ASCII数字的IntegrationFLow消息有效负载 . 代码是为消费者编写的,如下所示

@EnableBinding(UserOperationConsume.class)
 public class ConsumerController {

   @Bean
   IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
     return IntegrationFlows
     .from(u.userRegistraionProduces())
     //.transform(Transformers.toJson()) // not working as expected
     //.transform(Transformers.fromJson(UserDTO.class))
     .handle(String.class, (payload, headers) -> {
     System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118... 
     return null;
     }).get();
  }

 }

输入界面是,

public interface UserOperationConsume {
  @Input
  public SubscribableChannel userRegistraionProduces();
 }

消费者yml配置是,

server:
   port: 8181

 spring:
   application:
   name: nets-alert-service
 ---
 spring:
   cloud:
     config:
       name: notification-service
       uri: http://localhost:8888
 ---    
 spring:
   rabbitmq:
     host: localhost
     port: 5672
     username: guest
     password: guest

   ---
   spring:
     cloud:
       stream:
         bindings:
           userRegistraionProduces:
             destination: userOperations
         input:
           content-type: application/json

我尝试过Sink.class绑定,那次我从队列中得到了一条确切的消息 . 如果此IntegrationFlow配置中有任何错误,请告诉我 . 因为我是Spring Cloud 流和IntegrationFlow的新手 . 有没有办法将此ascii转换为精确的字符串?提前致谢

1 回答

  • 2

    使用 IntegrationFlows.from(channel) 不提供转换提示,因此您只需获取原始 byte[] 有效内容(包含JSON) . 目前尚不清楚为什么然后使用 toJson() 变压器 .

    您的 .handle(String.class, (payload, headers) -> {... 导致使用简单的 ArrayToStringConverter ,这就是您看到每个字节值的原因 .

    无论如何,您没有正确使用框架 . 使用...

    @StreamListener("userRegistraionProduces")
    public void listen(UserDTO dto) {
        System.out.println(dto);
    }
    

    ......并且框架将为您完成转换 . 要么...

    @StreamListener("userRegistraionProduces")
    public void listen(Message<UserDTO> dtoMessage) {
        System.out.println(dtoMessage);
    }
    

    如果您的 生产环境 者在 Headers 中传达其他信息 .

    EDIT

    如果你想自己做转换,这很好......

    @Bean
    IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
        return IntegrationFlows.from(u.userRegistraionProduces())
                .transform(Transformers.fromJson(UserDTO.class))
                .handle((payload, headers) -> {
                    System.out.println(payload.toString());
                    return null;
                }).get();
    }
    

    ...因为Json变压器可以读取 byte[] .

相关问题