生产者-消费者问题

常见实现方式:三个同步,一个管道

  • wait()/notify()实现
  • await()/signal()实现
  • BlockQueueing阻塞队列实现
  • PipedInputStream/PipedOutputStream管道实现

wait,notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class Main {
private static final int FULL = 20;
private static int count;
private static final Object ASYNC = new Object();

public static void main(String[] args) {
new Thread(new Product(4), "生产者1").start();
new Thread(new Consumer(3), "消费者1").start();
new Thread(new Product(2), "生产者2").start();
new Thread(new Consumer(4), "消费者2").start();
}

static class Product implements Runnable {
private int num;

public Product(int num) {
this.num = num;
}

@Override
public void run() {
synchronized (ASYNC) {
//当产品已满,生产者一直wait()操作,停止自己的操作,放弃对象锁
while (count >= FULL) {
System.out.println(Thread.currentThread().getName() + " wait");
try {
ASYNC.wait(); //放弃对象锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产者:" + Thread.currentThread().getName() + " 开始生产 " + num);
produce(num);
System.out.println("生产者:" + Thread.currentThread().getName() + " 生成完成 剩余:" + count);
ASYNC.notify();
}
}

private void produce(int num) {
for (int i = 0; i < num; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
}
}

static class Consumer implements Runnable {

private int num;

public Consumer(int num) {
this.num = num;
}

@Override
public void run() {
synchronized (ASYNC) {
//消费者需求>已有产品数量,消费者wait(),放弃对象锁,一直等待
while (num > count) {
System.out.println(Thread.currentThread().getName() + " wait");
try {
ASYNC.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费者:" + Thread.currentThread().getName() + " 开始消费 " + num);
consume(num);
System.out.println("消费者:" + Thread.currentThread().getName() + " 消费完成 剩余:" + count);
ASYNC.notify();
}
}

private void consume(int num) {
for (int i = 0; i < num; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
}
}
}
}

输出:

1
2
3
4
5
6
7
8
生产者:生产者1 开始生产 4
生产者:生产者1 生成完成 剩余:4
消费者:消费者2 开始消费 4
消费者:消费者2 消费完成 剩余:0
生产者:生产者2 开始生产 5
生产者:生产者2 生成完成 剩余:5
消费者:消费者1 开始消费 3
消费者:消费者1 消费完成 剩余:2

await,signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class Main {
private static final int FULL = 10;
private static int count;
private static final Lock lock = new ReentrantLock();
private static final Condition full = lock.newCondition();
private static final Condition empty = lock.newCondition();

public static void main(String[] args) {
new Thread(new Product(4), "生产者1").start();
new Thread(new Consumer(3), "消费者1").start();
new Thread(new Product(5), "生产者2").start();
new Thread(new Consumer(4), "消费者2").start();
}

static class Product implements Runnable {
private int num;

public Product(int num) {
this.num = num;
}

@Override
public void run() {
lock.lock();
//当产品已满,生产者一直wait()操作,停止自己的操作,放弃对象锁
while (count + num >= FULL) {
System.out.println(Thread.currentThread().getName() + " wait");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产者:" + Thread.currentThread().getName() + " 开始生产 " + num);
produce(num);
System.out.println("生产者:" + Thread.currentThread().getName() + " 生成完成 剩余:" + count);
full.signalAll();
empty.signalAll();
lock.unlock();
}

private void produce(int num) {
for (int i = 0; i < num; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
}
}

static class Consumer implements Runnable {

private int num;

public Consumer(int num) {
this.num = num;
}

@Override
public void run() {
lock.lock();
//消费者需求>已有产品数量,消费者wait(),放弃对象锁,一直等待
while (count < num) {
System.out.println(Thread.currentThread().getName() + " wait");
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费者:" + Thread.currentThread().getName() + " 开始消费 " + num);
consume(num);
System.out.println("消费者:" + Thread.currentThread().getName() + " 消费完成 剩余:" + count);
empty.signalAll();
full.signalAll();
lock.unlock();
}

private void consume(int num) {
for (int i = 0; i < num; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
}
}
}
}

BlockQueue

支持并发的阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class Main {

public static void main(String[] args) {
Storage storage = new Storage(10);

Product product = new Product(storage);
Product product2 = new Product(storage);
Product product3 = new Product(storage);
Consumer consumer = new Consumer(storage);
Consumer consumer2 = new Consumer(storage);
Consumer consumer3 = new Consumer(storage);
product.setNum(4);
product2.setNum(5);
product3.setNum(6);
consumer.setNum(4);
consumer2.setNum(5);
consumer3.setNum(6);
product.start();
product2.start();
product3.start();
consumer.start();
consumer2.start();
consumer3.start();
}
}

class Storage {

private final LinkedBlockingDeque<Object> blockingDeque;
private final int mMax;

public Storage(int mMax) {
this.mMax = mMax;
blockingDeque = new LinkedBlockingDeque<>(mMax);
}

public void product(int num) {
if (blockingDeque.size() == mMax) {
System.out.println("剩余容量为0,等待");
}
System.out.println("生产:" + num);
for (int i = 0; i < num; i++) {
try {
blockingDeque.put(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产完毕 库存:" + blockingDeque.size());
}

public void consumer(int num) {
if (blockingDeque.size() == 0) {
System.out.println("库存为0,等待生产");
}
System.out.println("消费:" + num);
for (int i = 0; i < num; i++) {
try {
blockingDeque.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费完毕 库存:" + blockingDeque.size());
}
}

class Product extends Thread {

private int num;
private Storage storage;

public Product(Storage storage) {
this.storage = storage;
}

public void setNum(int num) {
this.num = num;
}

@Override
public void run() {
product(num);
}

private void product(int num) {
storage.product(num);
}
}

class Consumer extends Thread {
private int num;
private Storage storage;

public Consumer(Storage storage) {
this.storage = storage;
}

public void setNum(int num) {
this.num = num;
}

@Override
public void run() {
consumer(num);
}

private void consumer(int num) {
storage.consumer(num);
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
生产:4
生产完毕 库存:4
生产:5
生产完毕 库存:9
生产:6
消费:4
消费:5
消费完毕 库存:4
消费完毕 库存:1
生产完毕 库存:6
消费:6
消费完毕 库存:0


Java      并发

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!