手写JDK组件之阻塞队列BlockedQueue
研究了一段时间框架,有点审美疲劳,今天讲点轻松的,手写一个阻塞队列,实践一把lock+condition。
“等待通知”机制
首先复习一下经典的 “等待通知”机制。
线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁 –《极客时间-Java并发编程实战》
在Java中实现 “等待通知” 机制一般有两种方式,synchronized/Lock+Condition。
通过synchronized实现 “等待-通知” 机制
synchronized同步原语(或称:管程)配合wait()、notify()、notifyAll()就可以实现“等待通知”机制。
机理是怎样的呢?
当使用synchronized管程对某一块临界区进行加锁,同一时刻,只能允许一个线程进入synchronized保护的临界区中。
当该远程进入临界区之后,其他的线程如果来访问临界区就需要进入等待队列中进行等待。
这里要注意,等待队列与锁是一一对应关系,每个互斥锁都有自己的独立的等待队列。
Java对象的wait()方法就能够让线程进入等待状态,此时线程被阻塞。
当线程进入等待队列时,会释放当前持有的互斥锁。当它释放锁之后,其他的线程就有机会获得该互斥锁并进入临界区。
那如何通知满足条件的线程呢?
通过Java对象的notify()和notifyAll()方法就能够实现。当条件满足时调用notify(),会通知等待队列中的线程,通知它 条件曾经满足过。
notify()只能保证在通知的那一时间点,条件是满足的。也就是,有可能被通知线程执行的时间点与通知的时间点是不相等的;即:线程执行的时候,条件已经不满足了(可能有其他的线程满足了该条件而插队)
另外,就算线程被通知而唤醒,在进入临界区前依旧需要获取互斥锁,因为这把需要获取的锁在调用wait()的时候已经被释放了。
需要注意的是:
wait()、notify()、notifyAll()被调用的前提是获取到了响应的互斥锁,也就是调用这三个方法的位置都是在 synchronized{} 内部。如果调用的位置在synchronized外部或者不是使用同一把互斥锁,JVM会抛出 java.lang.IllegalMonitorStateException 异常。
关于synchronized实现 “等待-通知” 机制我们就讲到这里。
通过Lock+Condition实现 “等待-通知” 机制与synchronized类似,我们本文实现阻塞队列BlockedQueue的方式就是通过Lock+Condition实现。
Lock+Condition原理讲解
Condition 定义了等待/通知两种类型的方法:await()/signal()/signalAll()。线程调用这些方法之前需要获取Condition关联的锁。
Condition对象是由Lock对象通过newCondition()方法创建的,也就是说,Condition是依赖Lock对象的。
类比上文中讲到的synchronized实现 “等待-通知” 机制,Lock/Condition涉及到的方法与synchronized方式涉及到的方法的语义是一一对应的,具体如下表:
synchronized | Lock/Condition | 描述 |
---|---|---|
wait() | await() | 等待 |
notify() | signal() | 通知单个等待队列中的线程 |
notifyAll() | signalAll() | 通知所有等待队列中的线程 |
实现阻塞队列BlockedQueue
了解并复习了 管程中的“等待/通知机制”,我们开始实现阻塞队列BlockedQueue。
在编写过程中参考了JUC中的ArrayBlockingQueue源码实现。
public class BlockedQueue<T> {
final Lock lock = new ReentrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();
// 阻塞单列最大长度
int capacity = 0;
// 当前已经存在下标:入队
int putIndex = 0;
// 当前已经存在下标:出队
int takeIndex = 0;
// 元素总数
int elementsSize = 0;
// 元素数组
Object[] items;
// 构造方法
public BlockedQueue(int capacity) {
this.capacity = capacity;
items = new Object[capacity];
System.out.println("capacity=" + capacity + ",items.size=" + items.length);
}
这段代码中我们声明了阻塞队列,支持泛型。声明了需要的成员变量以及有参构造方法。构造方法中根据外界输入的队列最大长度初始化了内部的元素数组。
提前声明并初始化了Lock(实现方式为ReentrantLock可重入锁),并在Lock基础上初始化了两个Condition条件变量,分别标记队列不满、队列不空。
// 入队
void enq(T x) {
lock.lock();
try {
// 队列已满
while (items.length == elementsSize) {
// 等待队列不满
notFull.await();
}
// 入队操作...
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
++elementsSize;
// 入队后, 通知可出队
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(x.toString() + "--入队完成");
}
}
这段代码为入队逻辑。
首先获取可重入锁,如果加锁成功则进入临界区逻辑,否则尝试解锁。
当队列已经满时,则进入阻塞状态,等待队列不满。
如果队列不满则进行入队,当前下标的元素即为要入队的元素,元素总长度增1。
// 出队
T deq() {
lock.lock();
T x = null;
try {
// 队列已空
while (items.length == 0) {
// 等待队列不空
notEmpty.await();
}
// 出队操作...
x = (T) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
elementsSize--;
// 出队后,通知可入队
notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return x;
}
这段代码为出队逻辑。
首先获取可重入锁,如果加锁成功则进入临界区逻辑,否则尝试解锁。
当队列已经空,则进入阻塞状态,等待队列不空。
如果队列不空则进行出队操作,先暂存当前下标的元素,并将当前下标的元素标记为空(NULL);元素总长度减1,解锁后返回当前已经出队的元素。
public T get(int index) {
return (T) items[index];
}
这段代码为获取对应下标的元素,如果元素不存在则返回空。
测试阻塞队列:单线程操作
开发完基本逻辑之后,我们写一个demo来测试一下BlockedQueue。
public static void main(String[] args) {
BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
for (int i = 0; i < 20; i++) {
blockedQueue.enq("snowalker:" + i);
}
System.out.println("入队结束:-------------------------");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}
for (int i = 0; i < 20; i++) {
blockedQueue.deq();
}
System.out.println("出队结束:-------------------------");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}
}
逻辑很好理解,我们构造了一个BlockedQueue,添加了20个元素进行入队。入队之后遍历元素,查看入队结果。
接着进行20次出队,并遍历出队后的结果。
运行结果如下:
capacity=20,items.size=20
入队结束:-------------------------
snowalker:0
snowalker:1
snowalker:2
snowalker:3
snowalker:4
snowalker:5
snowalker:6
snowalker:7
snowalker:8
snowalker:9
snowalker:10
snowalker:11
snowalker:12
snowalker:13
snowalker:14
snowalker:15
snowalker:16
snowalker:17
snowalker:18
snowalker:19
出队结束:-------------------------
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
可以看到,进行了20次入队之后元素共有20个;
进行了20次出队操作之后,元素全部为空,表示出队成功。
测试阻塞队列:多线程操作
我们接着测试一下多线程并发操作下,BlockedQueue的表现。
BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(2);
Thread thread0 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("线程0准备完毕");
for (int i = 0; i < 10; i++) {
blockedQueue.enq("线程0-snowalker-" + i);
}
System.out.println("线程0入队结束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
});
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("线程1准备完毕");
for (int i = 10; i < 20; i++) {
blockedQueue.enq("线程1-snowalker-" + i);
}
System.out.println("线程1入队结束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
});
thread0.start();
thread1.start();
begin.countDown();
end.await();
System.out.println("主线程准备完毕!");
System.out.println("主线程遍历开始!");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}
System.out.println("Bingo!");
}
我们定义了两个线程,每个线程添加10个元素,通过闭锁CountDownLatch进行并发添加,添加完成之后遍历添加结果。打印如下:
capacity=20,items.size=20
线程0准备完毕
线程1准备完毕
线程0-snowalker-0--入队完成
线程1-snowalker-10--入队完成
线程0-snowalker-1--入队完成
线程1-snowalker-11--入队完成
线程0-snowalker-2--入队完成
线程1-snowalker-12--入队完成
线程0-snowalker-3--入队完成
线程1-snowalker-13--入队完成
线程0-snowalker-4--入队完成
线程1-snowalker-14--入队完成
线程0-snowalker-5--入队完成
线程1-snowalker-15--入队完成
线程1-snowalker-16--入队完成
线程1-snowalker-17--入队完成
线程1-snowalker-18--入队完成
线程0-snowalker-6--入队完成
线程1-snowalker-19--入队完成
线程1入队结束:-------------------------
线程0-snowalker-7--入队完成
线程0-snowalker-8--入队完成
线程0-snowalker-9--入队完成
线程0入队结束:-------------------------
主线程准备完毕!
主线程遍历开始!
线程0-snowalker-0
线程1-snowalker-10
线程0-snowalker-1
线程1-snowalker-11
线程0-snowalker-2
线程1-snowalker-12
线程0-snowalker-3
线程1-snowalker-13
线程0-snowalker-4
线程1-snowalker-14
线程0-snowalker-5
线程1-snowalker-15
线程0-snowalker-6
线程1-snowalker-16
线程1-snowalker-17
线程1-snowalker-18
线程1-snowalker-19
线程0-snowalker-7
线程0-snowalker-8
线程0-snowalker-9
Bingo!
可以看到结果符合预期,我们接着测试一下并发出队,接着上面的添加结果进行并发出队操作。
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch dequeue = new CountDownLatch(2);
for (int i = 0; i < 20; i++) {
blockedQueue.enq("snowalker:" + i);
}
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("线程2准备完毕");
for (int i = 0; i <= 10; i++) {
blockedQueue.deq();
}
System.out.println("线程2出队结束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
dequeue.countDown();
}
}
});
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("线程3准备完毕");
for (int i = 0; i <= 10; i++) {
blockedQueue.deq();
}
System.out.println("线程3出队结束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
dequeue.countDown();
}
}
});
thread2.start();
thread3.start();
begin.countDown();
dequeue.await();
System.out.println("主线程准备完毕!");
System.out.println("主线程遍历开始!");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}
System.out.println("Bingo!");
}
我们准备了20个元素入队,然后并发进行出队,等待两个线程出队完成之后,在主线程进行队列元素的遍历操作,结果如下:
capacity=20,items.size=20
snowalker:0--入队完成
snowalker:1--入队完成
snowalker:2--入队完成
snowalker:3--入队完成
snowalker:4--入队完成
snowalker:5--入队完成
snowalker:6--入队完成
snowalker:7--入队完成
snowalker:8--入队完成
snowalker:9--入队完成
snowalker:10--入队完成
snowalker:11--入队完成
snowalker:12--入队完成
snowalker:13--入队完成
snowalker:14--入队完成
snowalker:15--入队完成
snowalker:16--入队完成
snowalker:17--入队完成
snowalker:18--入队完成
snowalker:19--入队完成
线程2准备完毕
线程2出队结束:-------------------------
线程3准备完毕
线程3出队结束:-------------------------
主线程准备完毕!
主线程遍历开始!
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
Bingo!
结果如上图所示,可以看到并发出队结果满足预期。
小结
本文我们利用JUC中的Lock+Condition管程实现了自定义BlockedQueue阻塞队列的开发,并通过测试用例测试了并发条件下的出队入队,结果符合预期。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。