
/**
* 阻塞队列的实现
* BlockingQueue是 juc中解决生产者和消费者问题的最有用的一个类 -》 皆是线程安全的类
*/
public interface BlockingQueue<T> {
/**
* 插入数据的接口
*/
void put(T element) throws InterruptedException;
/**
* 获取数据的接口
*/
T take() throws InterruptedException;
}
/**
* 本文需保证没有第三线程参与情况下
*/
public class MiniArrayBlockingQueue implements BlockingQueue{
// 实现并发控制
private Lock lock = new ReentrantLock();
/**
* 队列未满,生产者可继续往队列中添加数据
* 队列满了,生产者将通过 notFull.await() 将其挂起在 notFull条件队列中,等待消费者消费队列中元素时被唤醒
*/
private Condition notFull = lock.newCondition();
/**
* 队列不空,消费者可从队列中消费数据
* 队列空,消费者线程将通过 notEmpty.await() 将其挂起在 notEmpty条件队列中,等待生产者生产数据后被唤醒
*/
private Condition notEmpty = lock.newCondition();
/**
* 存放元素的队列 -> 数组实现
*/
private Object[] queues;
/**
* 数组长度
*/
private int size;
/**
* count:数组中元素个数
* putptr:记录生产者存放数据的下一次位置,每个生产者生产完数据后,会将 putptr ++
* takeptr:记录消费者消费数据的下一个位置,每个消费者消费完一个数据后,会将 takeptr ++
*/
private int count, putptr, takeptr;
public MiniArrayBlockingQueue(int size) {
this.size = size;
this.queues = new Object[size];
}
@Override
public void put(Object element) throws InterruptedException {
lock.lock();
try {
// 判满
if (count == size) {
notFull.await();
}
// 执行到这,说明队列未满
this.queues[putptr] = element;
putptr ++;
if (putptr == size) putptr = 0;
count ++;
// 给 notEmpty队列一个 唤醒信号
notEmpty.signal();
} finally {
lock.unlock();
}
}
@Override
public Object take() throws InterruptedException {
lock.lock();
try {
if (count == 0) {
notEmpty.await();
}
// 执行到这,说明队列中有元素 --> 指针来回交替便会覆盖.
Object o = queues[takeptr];
takeptr ++;
if (takeptr == size) takeptr = 0;
count --;
// 队列已不满, 给 notFull队列一个中断信号
notFull.signal();
return o;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BlockingQueue<Integer> queue = new MiniArrayBlockingQueue(10);
new Thread(() -> {
int i = 0;
while (true) {
i ++;
if (i == 10) i = 0;
try {
System.out.println("生产数据:" + i);
queue.put(Integer.valueOf(i));
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
Integer o = queue.take();
System.out.println("消费数据:" + o);
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}