学习java的并发包(JUC, java.util.concurrent),自然就回想起几个著名的类: ReentrantLock、Semaphore、CountDownLatch等。而在这些类都是基于AQS(AbstractQueuedSynchronizer)实现的,一个抽象模板类。 了解和学习AQS, 可以更好的理解和掌握JUC。
AQS支持共享和非共享两种模式,非共享对应的有ReentrantLock, 而共享对应的有CountDownLatch,semaphore等。
AQS的数据结构其实比较简单, 大致这个样子。
- head 和 tail 类型都是AQS的内部类-> node类型,
- state是一个int类型.也就是互斥/共享资源,state>0 资源被占用, state==0 资源被释放。
两种模式
独占模式:
- ReentrantLock:
- state为0代表资源未被占有,可以获取锁并设置为1,此时其他线程访问为1,代表资源已经被锁上(占有),所以线程自身进入lock(AQS)的双向队列中,等待资源释放。
共享模式:
- CountDownLatch:
- 一个线程(一般是主线程) latch = new CountDownLatch(num); 即state设为num
- latch.await(); 将线程自身加入latch(AQS)的双向队列,等待资源释放
- latch.countDown(); 将state-1,一旦state为0说明资源被释放。(一般是其他线程执行完成),之前await的线程可以继续执行。
源码分析
talk is cheap show me the code
1 | Semaphore semaphore = new Semaphore(5); |
最近用到了semaphore用来限流。 也就是dosometing的代码非常消耗内存,如果并发高很容易出现oom。这个时候就可以根据分配的Xmx和dosometing的计算最大并发个数,进行限制。
那么如何实现的并发控制,点进源码see see。
1 | //semaphore 类的定义。 |
acquire方法
那么先来看看acquire方法。实际执行的是sync的acquireSharedInterruptibly(1)
该方法没有被重写,在AbstractQueuedSynchronizer类中
1 | public final void acquireSharedInterruptibly(int arg) |
也就是尝试获取资源,发现此时资源已经被用完,所以自身线程加入aqs队列,将自己阻塞起来。
tryAcquireShared方法:
该方法重写了,在Semaphore类中。
1 | public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;} |
可以看到,如果有资源,则资源减一,返回的数大于等于0, 不阻塞。 反之, 没有资源,那么执行doAcquireSharedInterruptibly() 操作,再次尝试,如果失败则阻塞。
doAcquireSharedInterruptibly 方法:
该方法没有被重写,在AbstractQueuedSynchronizer类中
1 | private void doAcquireSharedInterruptibly(int arg) |
到这里阻塞就算是完成了。线程没有获取到资源,全都暂停在parkAndCheckInterrupt。等待其他拥有资源的线程release,那么就会唤醒阻塞的线程,重新回到for(;;)。 重新获取资源。
release方法
线程释放资源调用 release方法
1 | public void release() {sync.releaseShared(1);} |
和acquire方法相反, release方法释放资源,然后doReleaseShared 唤醒之前被阻塞在parkAndCheckInterrupt 方法的线程。
doReleaseShared 方法:
1 | private void doReleaseShared() { |
这个代码也是一个无限循环(闭锁),然后唤醒所有没有放弃的后继节点。线程就会回到上面doAcquire中继续循环。抢占资源或者继续阻塞。等待唤醒。
unpartSuccessor方法:
1 | private void unparkSuccessor(Node node) { |
到了这里才是真正唤醒线程。而互斥与之差不多,不过是资源数变成1,互斥访问。