我想创建一些 Producer/Consumer
线程应用程序 . 但我不确定在两者之间实现队列的最佳方法是什么 .
所以我有两个想法(两者都可能是完全错误的) . 我想知道哪个更好,如果它们都吮吸那么什么是实现队列的最佳方式 . 这主要是我在这些例子中实现的队列,我很关心 . 我正在扩展一个内部类的Queue类,并且是线程安全的 . 以下是两个示例,每个示例包含4个类 .
主类 -
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
消费者类 -
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
制片人类 -
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
队列类 -
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
OR
主类 -
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
消费者类 -
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
制片人类 -
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
队列类 -
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
去!
6 回答
Java 5拥有您需要的所有工具 . 你会想:
将所有制作人放在一个ExecutorService;
将所有消费者放在另一个消费者身上
ExecutorService
;如有必要,使用BlockingQueue在两者之间进行通信 .
我说“如果有必要”(3),因为根据我的经验,这是一个不必要的步骤 . 您所做的就是向消费者执行者服务提交新任务 . 所以:
所以
producers
直接提交给consumers
.好的,正如其他人所说,最好的办法是使用
java.util.concurrent
包 . 我强烈推荐"Java Concurrency in Practice" . 这本书很棒,几乎涵盖了你需要知道的一切 .至于你的特定实现,正如我在评论中提到的,不要从构造函数启动线程 - 它可能是不安全的 .
抛开这一点,第二个实现似乎更好 . 您不希望将队列放在静态字段中 . 你可能只是失去了灵活性 .
如果你想继续你自己的实现(我猜是为了学习目的?),至少提供一个
start()
方法 . 您应该构造对象(您可以实例化Thread
对象),然后调用start()
来启动该线程 .编辑:
ExecutorService
有自己的队列,所以这可能会令人困惑..这是让你开始的东西 .进一步编辑:对于 生产环境 者而不是
while(true)
,您可以执行以下操作:这样,您可以通过调用
.shutdownNow()
来关闭执行程序 . 如果您使用while(true)
,它将不会关闭 .另请注意
Producer
仍然容易受到RuntimeExceptions
(即一个RuntimeException
将停止处理)You are reinventing the wheel.
如果您需要持久性和其他企业功能使用JMS(我建议ActiveMq) .
如果你需要快速的内存中队列,请使用java的Queue的一种强制性 .
如果您需要支持java 1.4或更早版本,请使用Doug Lea优秀的concurrent包 .
我已经扩展了cletus提出的工作代码示例的答案 .
一个
ExecutorService
(pes)接受Producer
任务 .一个
ExecutorService
(ces)接受Consumer
任务 .Producer
和Consumer
共享BlockingQueue
.多个
Producer
任务生成不同的数字 .Consumer
任务中的任何一个都可以消耗由Producer
生成的数字码:
输出:
注意 . 如果您不需要多个 生产环境 者和消费者,请保留单个 生产环境 者和消费者 . 我添加了多个 生产环境 者和消费者,以在多个 生产环境 者和消费者之间展示BlockingQueue的功能 .
这是一个非常简单的代码 .
Java代码"BlockingQueue"已经同步了put和get方法 .
Java代码"Producer",生成者线程生成数据 .
Java代码"Consumer",消费者线程消耗所产生的数据 .
Java代码"ProducerConsumer_Main",启动 生产环境 者和消费者线程的主要功能 .
BlockingQueue.java
Consumer.java
ProducerConsumer_Main.java