首页 文章

将数据从一个套接字线程广播到java中的所有现有套接字线程

提问于
浏览
0

我有一个ServerSocket,它监听特定地址和端口上的连接,并为每个连接(客户端)创建一个线程以实现有状态协议,现在每个线程(客户端)消息都在它自己的线程中处理,这就是问题所在:

1.如何从一个线程(客户端套接字)向所有人发送数据(简单地广播消息) . 我看到两个线程之间有某种BlockingQueues,这里我有一对多线程用于广播,也有一个到一个私人消息模型 .

2.我看到一个用于在线程之间共享数据或资源的同步块解决方案但是在我的代码中我不知道我应该在哪里实现这个以及为什么?


输入服务器的入口点,ServerSocket初始化的位置和Listen:

public class ServerStarter {
    //All Users that creates from threads adding here.
    public static Hashtable<String,User> users = new Hashtable<String,User>(); 
    //All Channels add here.
    public static Hashtable<String,Channel> channels = new Hashtable<String,Channel>(); 
    final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

    public static void main(String args[]){
        Config config = new Config();
        ServerSocket Server = null;
        try {
            //server configs,from left to right is: PORT,BackLog,Address
            Server = new ServerSocket(config.port, config.backlog,config.ServerIP);
        } catch (IOException e) {
            System.err.println(e);
        }

        while (true) {
            Socket sock = null;
            BufferedReader inFromClient = null;
            try {
                sock = Server.accept();
            } catch (IOException e) {
                if (Server != null && !Server.isClosed()) {
                    try {
                        Server.close();
                    } catch (IOException e1)
                    {
                        e1.printStackTrace(System.err);
                    }
                }
                System.err.println(e);
            }
            try {
                inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
            } catch (IOException e) {
                System.err.println(e);
            }
            //each clients run on it's own thread!
            new SocketThread(sock,inFromClient).start();
        }
    }
}

客户端套接字线程创建的位置在这里:

public class SocketThread extends Thread {
    Socket csocket;
    BufferedReader inFromClient;

    public SocketThread(Socket csocket, BufferedReader inFromClient) {
        this.csocket = csocket;
        this.inFromClient = inFromClient;
    }

    public void run() {
        try {
            String fromclient = inFromClient.readLine();
            //some primary informations sent to server for further processing.
            RequestHandler Reqhandler = new RequestHandler(this.getId(), fromclient);
            System.out.println("=======================================");
            while (true) {
                fromclient = inFromClient.readLine();
                IRCParser parser = new IRCParser(fromclient);
                //if the primary info's are OK and nothing causes to kill the thread the clients go to the infinite loop for processing messages.
                Reqhandler.Commandhandler(parser.getCommand(), parser.getParameters());
            }
        } catch (IOException e) {
            //kill current thread!
            currentThread().interrupt();
            return;
        }
    }
}

如果类和其他数据不够,请告诉我添加更多代码和注释,tnx

1 回答

  • 1

    如果我正确理解您的要求,您只需要将消息从一个客户端线程传递到其他客户端线程 . 我认为你应该能够在这里使用观察者模式 .

    这样的事情 - (请注意,我已从您的代码中删除了所有其他不需要显示消息广播概念的内容 . 您可能需要根据您的要求更改它) .

    public class ServerStarter {
    
        private static final ServerStarter singleton = new ServerStarter();
    
        private volatile boolean shutdown;
        // thread pool executor
        private final ExecutorService executorService = Executors.newCachedThreadPool();
        // observable to notify client threads
        private final Observable observable = new Observable();
        // fair lock (can use unfair lock if message broadcasting order is not important)
        private final Lock fairLock = new ReentrantLock(true);
    
        private ServerStarter() {
        }
    
        public static ServerStarter getInstance() {
            return singleton;
        }
    
    
        public static void main(String args[]) {
            ServerSocket server = null;
            try {
                //server configs,from left to right is: PORT,BackLog,Address
                server = new ServerSocket();
                while (!ServerStarter.getInstance().isShutdown()) {
                    Socket sock = server.accept();
                    BufferedReader inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
                    //each clients run on it's own thread!
                    SocketThread clientThread = new SocketThread(sock, inFromClient);
                    ServerStarter.getInstance().registerClientThread(clientThread);
                    ServerStarter.getInstance().startClientThread(clientThread);
                }
            } catch (IOException e) {
                if (server != null) {
                    try {
                        server.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
                e.printStackTrace();
            }
        }
    
        public void shutdown() {
            shutdown = true;
        }
    
        public boolean isShutdown() {
            return shutdown;
        }
    
        public void startClientThread(SocketThread clientThread) {
            executorService.submit(clientThread);
        }
    
        private void registerClientThread(SocketThread clientThread) {
            observable.addObserver(clientThread);
        }
    
        public void notifyAllClients(final Object message) {
            fairLock.lock();
            try {
                executorService.submit(new MessageBroadcaster(message));
            } finally {
                fairLock.unlock();
            }
        }
    
        public void unregisterClientThread(SocketThread clientThread) {
            fairLock.lock();
            try {
                observable.deleteObserver(clientThread);
            } finally {
                fairLock.unlock();
            }
        }
    
        private class MessageBroadcaster implements Runnable {
            private final Object message;
    
            public MessageBroadcaster(Object message) {
                this.message = message;
            }
    
            @Override
            public void run() {
                fairLock.lock();
                try {
                    observable.notifyObservers(message);
                } finally {
                    fairLock.unlock();
                }
            }
        }
    }
    
    class SocketThread implements Runnable, Observer {
        Socket clientSocket;
        BufferedReader inFromClient;
    
        public SocketThread(Socket clientSocket, BufferedReader inFromClient) {
            this.clientSocket = clientSocket;
            this.inFromClient = inFromClient;
        }
    
        public void run() {
            try {
                String fromClient;
                while (!ServerStarter.getInstance().isShutdown() && (fromClient = inFromClient.readLine()) != null) {
                    // TODO...prepare message to broadcast
                    Object message = new Object();
                    ServerStarter.getInstance().notifyAllClients(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                ServerStarter.getInstance().unregisterClientThread(this);
            }
        }
    
        @Override
        public void update(Observable o, Object message) {
            // TODO...handle the message
        }
    }
    

    当客户端线程想要通知其他客户端线程时,它会异步使用observable来通知其他线程 . observable将调用每个客户端线程的 update() 方法 .

相关问题