我正在尝试在缺乏文档的最新reactor-netty版本上做一些项目前的经验;我正在使用0.8.0.M3版本 .

我用这个tcp服务器开发了一个简单的spring boot应用程序,它可以正常启动并且似乎可以工作:

@PostConstruct
public void startServer() throws InterruptedException {
    TcpServer.create().
              host("localhost").
              port(1235).
              handle((in, out) -> {

                    Flux<String> fluxString = in.receive().asString().log().
                        map(text -> {
                            return "Hi server have received "+text;});  
                        return out.sendString(fluxString).then();

              } 
              ).
              wiretap().bindNow();
}

如果我尝试使用客户端进行测试,则交互似乎正确,但我无法收到任何响应:

int counter = 10;
    CountDownLatch latch = new CountDownLatch(counter);
    Flux<String> input = Flux.range(0, counter).map(i->""+i);

    TcpClient.create().
      host("localhost").
      port(1235).
      handle((in, out) -> {

          in.receive().subscribe(receiv -> {System.out.println(receiv);latch.countDown();});
                return out.sendString(input).neverComplete();

      } 
      ).
      wiretap().connectNow();
    System.out.println("waiting closure");
    boolean result = latch.await(5, TimeUnit.SECONDS);

查看窃听日志记录似乎客户端将每个int分别作为字符串发送,服务器只接收一个聚合字符串“0123456789”并仅发送一个响应 . 客户端没有收到任何内容,并且锁存器不会递减1并保持为10(我预计至少会收到一个聚合响应) .

任何人都可以解释客户端有什么问题,以及如何通过服务器单独接收每个整数?

Thx G.