生产者-消费者问题 常见实现方式:三个同步,一个管道
wait()
/notify()
实现
await()
/signal()
实现
BlockQueueing
阻塞队列实现
PipedInputStream
/PipedOutputStream
管道实现
wait,notify 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 public class Main { private static final int FULL = 20; private static int count; private static final Object ASYNC = new Object(); public static void main(String[] args) { new Thread(new Product(4), "生产者1").start(); new Thread(new Consumer(3), "消费者1").start(); new Thread(new Product(2), "生产者2").start(); new Thread(new Consumer(4), "消费者2").start(); } static class Product implements Runnable { private int num; public Product(int num) { this.num = num; } @Override public void run() { synchronized (ASYNC) { //当产品已满,生产者一直wait()操作,停止自己的操作,放弃对象锁 while (count >= FULL) { System.out.println(Thread.currentThread().getName() + " wait"); try { ASYNC.wait(); //放弃对象锁 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产者:" + Thread.currentThread().getName() + " 开始生产 " + num); produce(num); System.out.println("生产者:" + Thread.currentThread().getName() + " 生成完成 剩余:" + count); ASYNC.notify(); } } private void produce(int num) { for (int i = 0; i < num; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count++; } } } static class Consumer implements Runnable { private int num; public Consumer(int num) { this.num = num; } @Override public void run() { synchronized (ASYNC) { //消费者需求>已有产品数量,消费者wait(),放弃对象锁,一直等待 while (num > count) { System.out.println(Thread.currentThread().getName() + " wait"); try { ASYNC.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费者:" + Thread.currentThread().getName() + " 开始消费 " + num); consume(num); System.out.println("消费者:" + Thread.currentThread().getName() + " 消费完成 剩余:" + count); ASYNC.notify(); } } private void consume(int num) { for (int i = 0; i < num; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count--; } } } }
输出:
1 2 3 4 5 6 7 8 生产者:生产者1 开始生产 4 生产者:生产者1 生成完成 剩余:4 消费者:消费者2 开始消费 4 消费者:消费者2 消费完成 剩余:0 生产者:生产者2 开始生产 5 生产者:生产者2 生成完成 剩余:5 消费者:消费者1 开始消费 3 消费者:消费者1 消费完成 剩余:2
await,signal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class Main { private static final int FULL = 10; private static int count; private static final Lock lock = new ReentrantLock(); private static final Condition full = lock.newCondition(); private static final Condition empty = lock.newCondition(); public static void main(String[] args) { new Thread(new Product(4), "生产者1").start(); new Thread(new Consumer(3), "消费者1").start(); new Thread(new Product(5), "生产者2").start(); new Thread(new Consumer(4), "消费者2").start(); } static class Product implements Runnable { private int num; public Product(int num) { this.num = num; } @Override public void run() { lock.lock(); //当产品已满,生产者一直wait()操作,停止自己的操作,放弃对象锁 while (count + num >= FULL) { System.out.println(Thread.currentThread().getName() + " wait"); try { full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产者:" + Thread.currentThread().getName() + " 开始生产 " + num); produce(num); System.out.println("生产者:" + Thread.currentThread().getName() + " 生成完成 剩余:" + count); full.signalAll(); empty.signalAll(); lock.unlock(); } private void produce(int num) { for (int i = 0; i < num; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count++; } } } static class Consumer implements Runnable { private int num; public Consumer(int num) { this.num = num; } @Override public void run() { lock.lock(); //消费者需求>已有产品数量,消费者wait(),放弃对象锁,一直等待 while (count < num) { System.out.println(Thread.currentThread().getName() + " wait"); try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费者:" + Thread.currentThread().getName() + " 开始消费 " + num); consume(num); System.out.println("消费者:" + Thread.currentThread().getName() + " 消费完成 剩余:" + count); empty.signalAll(); full.signalAll(); lock.unlock(); } private void consume(int num) { for (int i = 0; i < num; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count--; } } } }
BlockQueue 支持并发的阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 public class Main { public static void main(String[] args) { Storage storage = new Storage(10); Product product = new Product(storage); Product product2 = new Product(storage); Product product3 = new Product(storage); Consumer consumer = new Consumer(storage); Consumer consumer2 = new Consumer(storage); Consumer consumer3 = new Consumer(storage); product.setNum(4); product2.setNum(5); product3.setNum(6); consumer.setNum(4); consumer2.setNum(5); consumer3.setNum(6); product.start(); product2.start(); product3.start(); consumer.start(); consumer2.start(); consumer3.start(); } } class Storage { private final LinkedBlockingDeque<Object> blockingDeque; private final int mMax; public Storage(int mMax) { this.mMax = mMax; blockingDeque = new LinkedBlockingDeque<>(mMax); } public void product(int num) { if (blockingDeque.size() == mMax) { System.out.println("剩余容量为0,等待"); } System.out.println("生产:" + num); for (int i = 0; i < num; i++) { try { blockingDeque.put(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产完毕 库存:" + blockingDeque.size()); } public void consumer(int num) { if (blockingDeque.size() == 0) { System.out.println("库存为0,等待生产"); } System.out.println("消费:" + num); for (int i = 0; i < num; i++) { try { blockingDeque.take(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费完毕 库存:" + blockingDeque.size()); } } class Product extends Thread { private int num; private Storage storage; public Product(Storage storage) { this.storage = storage; } public void setNum(int num) { this.num = num; } @Override public void run() { product(num); } private void product(int num) { storage.product(num); } } class Consumer extends Thread { private int num; private Storage storage; public Consumer(Storage storage) { this.storage = storage; } public void setNum(int num) { this.num = num; } @Override public void run() { consumer(num); } private void consumer(int num) { storage.consumer(num); } }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 生产:4 生产完毕 库存:4 生产:5 生产完毕 库存:9 生产:6 消费:4 消费:5 消费完毕 库存:4 消费完毕 库存:1 生产完毕 库存:6 消费:6 消费完毕 库存:0