首页 文章

使用入站通道和响应通道动态生成TCP客户端

提问于
浏览
1

我是Spring集成的新手 .

使用Spring 4,只有java anotations .

我现在正在工作的项目我们在属性文件中设置了tcp连接 .

在momet它只硬编码到2个不同的连接,它必须改为更动态的方法,我们可以在属性文件中设置它们的可变数量,并能够在运行时添加新的连接 .

我知道dynamic tcp client example的存在,并试图将我的工作 Build 在它上面 .

首先,我们为连接设置以下bean:

@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
  final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
  env.getProperty("socket.tcp.nodes[0].ip"), 
  env.getProperty("socket.tcp.nodes[0].port", Integer.class)
  );

  tcpNetClientConnectionFactory.setSingleUse(false);
  tcpNetClientConnectionFactory.setSoKeepAlive(true);

  final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);

  tcpNetClientConnectionFactory.setSerializer(by);
  tcpNetClientConnectionFactory.setDeserializer(by);
  return tcpNetClientConnectionFactory;
}

然后我们有适配器等待发送的东西:

@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
        @Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
    final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    adapter.setConnectionFactory(connectionFactory);
    adapter.setClientMode(true);
    adapter.setErrorChannelName("errorChannel");
    adapter.setRetryInterval(retryInterval);
    adapter.setOutputChannel(fromTcp());
    return adapter;
}

当调用fromTcp()时,它会转换消息,并且以下代码将其发送到另一个应用程序以进行进一步处理 .

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
    sendToApi(inMessage, headerMap);
}

当消息发出时,我们必须发送一个响应 .

@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
        final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
    final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
    tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
    return tcpSendingMsgHandler;
}

并使用网关:

@MessagingGateway()
public interface MessageTcpGateway {

  @Gateway(requestChannel = "toTcpCh01")
  ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}

我们寄回去 .

通过该示例,我可以了解如何为响应动态创建流 .

但我无法理解如何创建一个公共连接池,然后根据这些connectionsfactory动态创建监听适配器和响应适配器,然后在运行时关闭/删除它们 .

由于this question,我有点了解如何使用入站适配器创建流程

我是否需要为每个适配器创建多个单独的IntegrationFlow?所以所有的调用和响应都可以异步处理(我可能错误的是异步)

然后在想要关闭连接时分别处理它们?比如调用接近TcpReceivingChannelAdapter然后调用TcpSendingMessageHandler,最后取消注册connectonfactory?

1 回答

  • 2

    我不这样说Collaborating Channel Adapters你需要 TcpReceivingChannelAdapterTcpSendingMessageHandler 的单独 IntegrationFlow 定义 . 它真的可以作为单个 IntegrationFlowTcpReceivingChannelAdapter 开始并以 TcpSendingMessageHandler 结束 . 关键是 IntegrationFlow 本身只是一个逻辑容器,用于对组件引用进行分组 . 所有那些你在那里声明的组件都真的完成了这项艰苦的工作,而 TcpReceivingChannelAdapterTcpSendingMessageHandler 和你之间的网关你真的会异步 .

    请记住, ByteArrayLengthHeaderSerializer 也必须被声明为bean . 不确定每个动态流都需要一个单独的实例,但是这里有一个API可以从那里完成:

    /**
         * Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
         * application context. Usually it is some support component, which needs an application context.
         * For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
         * @param bean an additional arbitrary bean to register into the application context.
         * @return the current builder instance
         */
        IntegrationFlowRegistrationBuilder addBean(Object bean);
    

相关问题