首页 文章

带管道的Reactor Netty TcpServer

提问于
浏览
0

我在反应堆netty之前组建一个Netty Tcp服务器的方法是创建服务器bootsrap并添加我的自定义管道类 . 使用Reactor-Netty有TcpServer.create(),但似乎我必须创建一个新的功能接口,它接受NettyInbound和NettyOutbound并返回一个Mono . 但是,如果我想添加一个构建我的管道的ChannelInitializer,我必须阻止获取NettyContext . 传入的消息由功能接口接收,我可以发送响应,但没有任何内容通过我的管道 .

有没有办法让我们使用Reactor Netty并通过自定义管道传递消息?

使用neverComplete()返回Mono.just(“Hi”)会在 Build 连接并收到消息时成功向客户端发送“Hi”,但我需要将其卸载到管道然后获得结果提要回到客户端 .

public void startServer() throws InterruptedException{
    EventLoopGroup group = new NioEventLoopGroup(1);
    try {
        final TcpServer server = TcpServer.create(opts -> opts
            .eventLoopGroup(group)
            .listen(tcpSocketAddress));
        server
            .newHandler((in, out) -> {
                in.receive()
                    .take(1)
                    .log(ApolloApplicationTests.class.getName())
                    .subscribe(data -> {
                        log.info("Server Received: {}", data.toString(CharsetUtil.UTF_8));
                       latch.countDown();
                    });
                    return out.sendString(Mono.just("Hi")).neverComplete();
            })
            .block().addHandler(clientEndPoint)
            .channel()
            .closeFuture().sync();

    } finally {
        group.shutdownGracefully().sync();
    }
}



import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import reactor.util.Logger;
import reactor.util.Loggers;

@Configurable
@Component
public class ClientEndPoint extends ChannelInitializer<Channel> {

final Logger log = Loggers.getLogger(ApolloApplication.class);

private ChannelPipeline pipeline;

@Autowired
private ChannelHandlerAdapter messageInterchange;

@Autowired
private LengthFieldBasedFrameDecoder lowOrderVliDecoder;

@Autowired
private MessageToMessageDecoder<ByteBuf> messageDecoder;

@Autowired
private LengthFieldPrepender vliEncoder;

@Autowired
@Qualifier("inBound")
List<ChannelHandler> inBoundHandlers;

@Autowired
@Qualifier("outBound")
List<ChannelHandler> outBoundHandlers;

    @Override
    protected void initChannel(Channel sc) throws Exception {
        this.pipeline = sc.pipeline();
        this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);

        this.pipeline.addLast("messageDecoder", this.messageDecoder);

        this.pipeline.addLast("vliEncoder", this.vliEncoder);

        for (ChannelHandler handler : this.inBoundHandlers) {
            this.pipeline.addLast(handler);
        }

        this.pipeline.addLast("messageInterchange", this.messageInterchange);

        for (ChannelHandler handler : this.outBoundHandlers) {
            this.pipeline.addLast(handler);
        }
    }


    public void accept(Channel sc) {
        this.pipeline = sc.pipeline();
        this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);

        this.pipeline.addLast("messageDecoder", this.messageDecoder);

        this.pipeline.addLast("vliEncoder", this.vliEncoder);

        for (ChannelHandler handler : this.inBoundHandlers) {
            this.pipeline.addLast(handler);
        }

        this.pipeline.addLast("messageInterchange", this.messageInterchange);

        for (ChannelHandler handler : this.outBoundHandlers) {
            this.pipeline.addLast(handler);
        }
    }
}

1 回答

  • 0

    所以我想通了

    public Mono<? extends NettyContext> initializeServer() throws InterruptedException {
        this.log.debug("Server Initializing");
        BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> serverHandler = (in,
                out) -> {
            in.receive().asString().subscribe(data -> {
                this.log.debug("Received " + data + " on " + in);
            });
            return Flux.never();
        };
    
        TcpServer server = TcpServer.create(opts -> opts.afterChannelInit(pipeline).listen(tcpSocketAddress));
        return server.newHandler(serverHandler);
    }
    

    其中,pipeline是实现Consumer的类,并在accept方法中将管道构建为典型的netty管道 .

    然后我启动服务器

    private void startServer(Mono<? extends NettyContext> connected) {
            ChannelFuture f = connected.block(Duration.ofSeconds(5)).channel()
                    .closeFuture();
            final CountDownLatch channelLatch = new CountDownLatch(1);
            f.addListener(new ChannelFutureListener() {
    
                @Override
                public void operationComplete(ChannelFuture cf) throws Exception {
                    log.debug("Channel Disconnected");
    
                }
    
            });
            f.awaitUninterruptibly();
            // Now we are sure the future is completed.
            assert f.isDone();
    
            if (f.isCancelled()) {
                this.log.warn("Connection Cancelled");
            } else if (!f.isSuccess()) {
                if (f.cause() != null) {
                    f.cause().printStackTrace();
                } else {
                    this.log.warn("Connection not successful");
                }
            } else {
                channelLatch.countDown();
                this.log.info("Server Start Successful");
            }
            try {
                channelLatch.await();
            } catch (InterruptedException ex) {
                throw new CancellationException("Interrupted while waiting for streaming " + "connection to arrive.");
            }
    }
    

相关问题