首页 文章

Java非阻塞IO选择器导致通道寄存器阻塞

提问于
浏览
4

我有两个线程,我正在处理Java NIO的非阻塞套接字 . 这是线程正在做的事情:

线程1:调用选择器的select()方法的循环 . 如果有任何密钥可用,则会相应地处理它们 .

线程2:偶尔通过调用register()将SocketChannel注册到选择器 .

问题是,除非select()的超时非常小(比如大约100ms),对register()的调用将无限期地阻塞 . 即使通道配置为非阻塞,并且javadocs声明Selector对象是线程安全的(但它的选择键不是,我知道) .

所以任何人都对这个问题有什么看法?如果我把所有东西都放在一个线程中,该应用程那时候没有问题,但我真的想要有单独的线程 . 任何帮助表示赞赏 . 我在下面发布了我的示例代码:

将选择(1000)更改为选择(100),它将起作用 . 保留为select()或select(1000),但不会 .

import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; 

 public class UDPSocket 
{
 private DatagramChannel clientChannel;
 private String dstHost;
 private int dstPort;
 private static Selector recvSelector;
 private static volatile boolean initialized;
 private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); 

 public static void init()
 {
  initialized = true; 

 try 
  {
   recvSelector = Selector.open();
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  } 

 Thread t = new Thread(new Runnable()
  {
   @Override
   public void run() 
   {
    while(initialized)
    {
     readData();
     Thread.yield();
    }
   } 
  });
  t.start();
 } 

 public static void shutdown()
 {
  initialized = false;
 } 

 private static void readData()
 {
  try
  {
   int numKeys = recvSelector.select(1000); 

 if (numKeys > 0)
   {
    Iterator i = recvSelector.selectedKeys().iterator(); 

 while(i.hasNext())
{
 SelectionKey key = i.next();
 i.remove();

 if (key.isValid() && key.isReadable())
 {
  DatagramChannel channel = (DatagramChannel) key.channel();

  // allocate every time we receive so that it's a copy that won't get erased
  final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
  channel.receive(buffer);
  buffer.flip();
  final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();

  // let user handle event on a dedicated thread
  eventQueue.execute(new Runnable()
  {
   @Override
   public void run() 
   {
    subscriber.onData(buffer);
   }       
  });
 }
}
 

 }
  }
  catch (IOException e)
  {
   System.err.println(e);
  } 
 } 

 public UDPSocket(String dstHost, int dstPort)
 {
  try
  {
   this.dstHost = dstHost;
   this.dstPort = dstPort;
   clientChannel = DatagramChannel.open();
   clientChannel.configureBlocking(false);
  }
  catch (IOException e)
  {
   System.err.println(e);
  }
 } 

 public void addListener(SocketSubscriber subscriber)
 {
  try 
  {
   DatagramChannel serverChannel = DatagramChannel.open();
   serverChannel.configureBlocking(false);
   DatagramSocket socket = serverChannel.socket();
   socket.bind(new InetSocketAddress(dstPort));
   SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ);
   key.attach(subscriber);
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }
 } 

 public void send(ByteBuffer buffer)
 {
  try 
  {
   clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort));
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }
 } 

 public void close()
 {
  try 
  {
   clientChannel.close();
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }
 }
}
import java.nio.ByteBuffer; 

 public interface SocketSubscriber 
{
 public void onData(ByteBuffer data);
}

用法示例:

public class Test implements SocketSubscriber
{
 public static void main(String[] args) throws Exception
 {
  UDPSocket.init();
  UDPSocket test = new UDPSocket("localhost", 1234);
  test.addListener(new Test());
  UDPSocket test2 = new UDPSocket("localhost", 4321);
  test2.addListener(new Test());
  System.out.println("Listening...");
  ByteBuffer buffer = ByteBuffer.allocate(500);
  test.send(buffer);
  buffer.rewind();
  test2.send(buffer);
  System.out.println("Data sent...");
  Thread.sleep(5000);
  UDPSocket.shutdown();
 } 

 @Override
 public void onData(ByteBuffer data) 
 {
  System.out.println("Received " + data.limit() + " bytes of data.");
 }
}

2 回答

  • 3

    Selector有几个记录的内部同步级别,您将全部遇到这些级别 . 在调用_1692251之前调用选择器上的 wakeup() 如果选择了零键,请确保 select() 循环正常工作,这将在 wakeup(). 上发生

  • 3

    我今天遇到了同样的问题(那就是“wakeupAndRegister”无法使用) . 我希望我的解决方案可能有所帮助:

    创建同步对象:

    Object registeringSync = new Object();
    

    通过以下方式注册 Channels :

    synchronized (registeringSync) {
      selector.wakeup();  // Wakes up a CURRENT or (important) NEXT select
      // !!! Might run into a deadlock "between" these lines if not using the lock !!!
      // To force it, insert Thread.sleep(1000); here
      channel.register(selector, ...);
    }
    

    该线程应该执行以下操作:

    public void run() {    
      while (initialized) {
        if (selector.select() != 0) {  // Blocks until "wakeup"
          // Iterate through selected keys
        }
        synchronized (registeringSync) { }  // Cannot continue until "register" is complete
      }
    }
    

相关问题