Thread - Consumer and Producer

生产者消费者模型具有以下特点:

其设计关键点在于:缓冲区(仓库)的设计和等待、通知机制的实现

例子1:缓冲区大小为1,生产者消费者共用一把锁

生产者

public class Producer {
	private Object lock;
	private List<Integer> container;

	public Producer(Object lock, List<Integer> container) {
		this.lock = lock;
		this.container = container;
	}

	public void produce() {
		try {
			synchronized (lock) {
				if (!container.isEmpty()) // 如果容器满了,则当前线程等待,释放lock monitor的锁
					lock.wait();
				System.out.println("Produce start ...");
				int value = new Random().nextInt(10);
				container.add(value);
				Thread.sleep(500);
				System.out.println("Produce end. Value: " + value);
				lock.notify();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

消费者

public class Consumer {
	private Object lock;
	private List<Integer> container;

	public Consumer(Object lock, List<Integer> container) {
		this.lock = lock;
		this.container = container;
	}

	public void consume() {
		try {
			synchronized (lock) {
				if (container.isEmpty()) // 如果容器为空,则当前线程等待,释放lock monitor的锁
					lock.wait();
				System.out.println("Consume start ...");
				int value = container.get(0);
				container.remove(0);
				Thread.sleep(1000);
				System.out.println("Consume end. Value " + value);
				lock.notify();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

主函数

  	public static void main(String[] args) {
	    Object lock = new Object();
	    List<Integer> container = new ArrayList<Integer>();
	    final Producer producer = new Producer(lock, container);
	    final Consumer consumer = new Consumer(lock, container);
	    Runnable producerRunnable = new Runnable() {
	        public void run() {
	            while (true) {
	                producer.produce();
	            }
	        }
	    };
	    Runnable consumerRunnable = new Runnable() {
	        public void run() {
	            while (true) {
	                consumer.consume();
	            }
	        }
	    };
	    Thread producerThread = new Thread(producerRunnable);
	    Thread CustomerThread = new Thread(consumerRunnable);
	    producerThread.start();
	    CustomerThread.start();
	}

http://blog.csdn.net/zhangyuan19880606/article/details/51153035

例子2:缓冲区大小为N,生产者消费者共用一把锁

这里的关键在于:生产过程和消费过程,不能放到synchronized block里。只有当等待,通知的时候,两者才会去抢同一把锁。

生产者

public class Producer {
	private int MAX_SIZE = 10;
	private Object lock;
	private List<Integer> container;

	public Producer(Object lock, List<Integer> container) {
		this.lock = lock;
		this.container = container;
	}

	public void produce() {
		try {
			// 等待过程
			if (container.size() == MAX_SIZE) {
				synchronized (lock) {
					lock.wait();
				}
			}
			// 生产过程
			System.out.println("Produce start ...");
			int value = new Random().nextInt(10);
			container.add(value);
			Thread.sleep(500);
			System.out.println("Produce end. Value: " + value);
			// 通知过程
			synchronized (lock) {
				lock.notify();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

消费者

public class Consumer {
	private Object lock;
	private List<Integer> container;

	public Consumer(Object lock, List<Integer> container) {
		this.lock = lock;
		this.container = container;
	}

	public void consume() {
		try {
			// 等待过程
			if (container.isEmpty()) {
				synchronized (lock) {
					lock.wait();
				}
			}
			// 消费过程
			System.out.println("Consume start ...");
			int value = container.get(0);
			container.remove(0);
			Thread.sleep(1000);
			System.out.println("Consume end. Value " + value);
			// 通知过程
			synchronized (lock) {
				lock.notify();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

主函数

	public static void main(String[] args) {
		Object lock = new Object();
		List<Integer> container = Collections.synchronizedList(new ArrayList<Integer>());
		final Producer producer = new Producer(lock, container);
		final Consumer consumer = new Consumer(lock, container);
		Runnable producerRunnable = new Runnable() {
			public void run() {
				while (true) {
					producer.produce();
				}
			}
		};
		Runnable consumerRunnable = new Runnable() {
			public void run() {
				while (true) {
					consumer.consume();
				}
			}
		};
		Thread producerThread = new Thread(producerRunnable);
		Thread CustomerThread = new Thread(consumerRunnable);
		producerThread.start();
		CustomerThread.start();
	}

例子3:用BlockingQueue实现缓冲区

通过BlockingQueue实现缓冲区,不需要去额外实现等待、通知机制

生产者

public class Producer {
	private BlockingQueue<Integer> container;

	public Producer(BlockingQueue<Integer> container) {
		this.container = container;
	}

	public void produce() {
		try {
			System.out.println("Produce start ...");
			int value = new Random().nextInt(10);
			container.put(value);
			Thread.sleep(500);
			System.out.println("Produce end. Value: " + value);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

消费者

public class Consumer {
	private BlockingQueue<Integer> container;

	public Consumer(BlockingQueue<Integer> container) {
		this.container = container;
	}

	public void consume() {
		try {
			System.out.println("Consume start ...");
			int value = container.take();
			Thread.sleep(1000);
			System.out.println("Consume end. Value " + value);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}
}

主函数

	public static void main(String[] args) {
	    final BlockingQueue<Integer> container = new ArrayBlockingQueue<Integer>(10);
	    final Producer producer = new Producer(container);
	    final Consumer consumer = new Consumer(container);
	    Runnable producerRunnable = new Runnable() {
	        public void run() {
	            while (true) {
	                producer.produce();
	            }
	        }
	    };
	    Runnable consumerRunnable = new Runnable() {
	        public void run() {
	            while (true) {
	                consumer.consume();
	            }
	        }
	    };
	    Thread producerThread = new Thread(producerRunnable);
	    Thread customerThread = new Thread(consumerRunnable);
	    producerThread.start();
	    customerThread.start();
	}

例子4:多生产者,多消费者的模型

设计的关键在于使用notifyAll()去通知所有线程

生产者

public class Producer {
	private int MAX_SIZE = 10;
	private String name;
	private Object lock;
	private List<Integer> container;

	public Producer(String name, Object lock, List<Integer> container) {
		this.name = name;
		this.lock = lock;
		this.container = container;
	}

	public void produce() {
		try {
			// 等待过程
			if (container.size() == MAX_SIZE) {
				synchronized (lock) {
					lock.wait();
				}
			}
			// 生产过程
			System.out.println("+++P" + name + ": Produce start ...");
			int value = new Random().nextInt(10);
			container.add(value);
			Thread.sleep(500);
			System.out.println("+++P" + name + ": Produce end. Value: " + value);
			// 通知过程
			synchronized (lock) {
				lock.notifyAll();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

消费者

public class Consumer {
	private String name;
	private Object lock;
	private List<Integer> container;

	public Consumer(String name, Object lock, List<Integer> container) {
		this.name = name;
		this.lock = lock;
		this.container = container;
	}

	public void consume() {
		try {
			// 等待过程
			if (container.isEmpty()) {
				synchronized (lock) {
					lock.wait();
				}
			}
			// 消费过程
			System.out.println("---C"+ name + ": Consume start ...");
			int value = container.get(0);
			container.remove(0);
			Thread.sleep(1000);
			System.out.println("---C"+ name + ": Consume end. Value " + value);
			// 通知过程
			synchronized (lock) {
				lock.notifyAll();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

主函数

	public static void main(String[] args) {
		Object lock = new Object();
		List<Integer> container = Collections.synchronizedList(new ArrayList<Integer>());
		final Producer p1 = new Producer("1", lock, container);
		final Producer p2 = new Producer("2", lock, container);
		final Consumer c1 = new Consumer("1", lock, container);
		final Consumer c2 = new Consumer("2", lock, container);
		Runnable p1Runnable = new Runnable() {
			public void run() {
				while (true) {
					p1.produce();
				}
			}
		};
		Runnable p2Runnable = new Runnable() {
			public void run() {
				while (true) {
					p2.produce();
				}
			}
		};
		Runnable c1Runnable = new Runnable() {
			public void run() {
				while (true) {
					c1.consume();
				}
			}
		};
		Runnable c2Runnable = new Runnable() {
			public void run() {
				while (true) {
					c2.consume();
				}
			}
		};
		Thread p1Thread = new Thread(p1Runnable);
		Thread p2Thread = new Thread(p2Runnable);
		Thread c1Thread = new Thread(c1Runnable);
		Thread c2Thread = new Thread(c2Runnable);
		p1Thread.start();
		p2Thread.start();
		c1Thread.start();
		c2Thread.start();
	}

http://www.infoq.com/cn/articles/producers-and-consumers-mode/#

Fork me on GitHub