我使用BlockingQueue实现了 生产环境 者消费者问题 . 在此, 生产环境 者以不同的间隔将一些任务发布到阻塞队列,并且多个消费者将该任务用于处理它 . 我的 生产环境 者代码工作正常,但我的消费者对象抛出错误 . 我是多线程的新手 . 我会感谢你的帮助 .

制片人

public class Producer_Events implements Runnable{

private BlockingQueue<ArrayList<String>> queue;
ArrayList<String> batch_list = new ArrayList<String>();

public Producer_Events(BlockingQueue<ArrayList<String>> queue,ArrayList<String> batch_list) {
    this.queue = queue;
    this.batch_list = batch_list;

}

public void run() {
    try {
        // do all the shit
        ArrayList<Long> tymstamp = new ArrayList<Long>();
        // read timestamp from a file and as per timestamp put value in blockingqueue
        //if timestamp ended put a variable named poison to terminate the thread for both producer and consumer
        System.out.println("Timestamp added");
        for (int m = 0; m<batch_list.size();m++){

            String operator = new String();
            operator = batch_list.get(m);
            ArrayList<String> batch1 = new ArrayList<>();
            Thread.sleep(100);// timestamp to add value in blockqueu
            batch1.add(operator);
            if(m!=batch_list.size()){
                queue.put(batch1);
                System.out.println("Event Added: "+ m);
            }
            else{
                // inject a poison
                ArrayList<String> batch2 = new ArrayList<>(-1);
                queue.put(batch2);
                System.out.println("Producer STOPPED.");
            }

        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (NumberFormatException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } 
}}

消费者代码

public class ConsumerWorker implements Runnable {

private BlockingQueue<ArrayList<String>> inputQueue;

// private final static ArrayList<String> POISON = new ArrayList<String>();

public ConsumerWorker(BlockingQueue<ArrayList<String>> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    // worker loop keeps taking en element from the queue as long as the
    // producer is still running or as
    // long as the queue is not empty:
    System.out.println("Check loop");
    while (true) {
        System.out.println("Consumer " + Thread.currentThread().getName()
                + " START");
        try {
            ArrayList<String> queueElement = inputQueue.take();
            System.out.println("Event Send to Crowd: ");
            if (queueElement.isEmpty()) {
                break;
            } else {

                System.out.println("DO Processing");

            }

        } catch (InterruptedException e) {
            System.out.println("Interrupted.");
        }
        System.out.println("Throwing exception ...");
        throw new RuntimeException();
    }
}}

主要功能

public class test {

/**
 * @param args
 */
public static void main(String[] args) {

    ArrayList<String> batch_list = new ArrayList<>();
    for (int i=0;i<10;i++){
        batch_list.add("OK");
    }
    // block queue of size 20
    BlockingQueue<ArrayList<String>> queue = new ArrayBlockingQueue<ArrayList<String>>(20);
    // creating multiple consumers to access blockqueue
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

    Producer_Events producer = new Producer_Events(queue,batch_list);
    ConsumerWorker consumer = new ConsumerWorker(queue);
    System.out.println("OK");
    new Thread(producer).start();
    executor.execute(consumer);

    /*Thread myThread = new Thread(new ConsumerWorker(queue));

    //ConsumerWorker consumer = new ConsumerWorker(queue);
    myThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        public void uncaughtException(Thread myThread, Throwable e) {
            System.out.println(myThread.getName() + " throws exception: " + e);
         }
    });*/

}}

错误