手写AQS

网上看了视频,讲解的比较好,特此记录,加深记忆。

1
2
3
4
5
6
7
8
9
10
11

ReentrantLock syn = new ReentrantLock();

public void func(){
syn.lock();

...

syn.unLock();

}
  1. 首先定义一个自己的lock。
    public class CustomLock{

}

  1. 当有10个线程分别是T1,T2,T3… … T10,假为T1获得锁,那个其他线程在执行到syn.lock()的地方,肯定是停在这个地方等T1释放锁了。那这个停的行为怎么实现呢?

    • this.wait(),这种方式先不考虑,因为他是基于object.monitor()实现的。也就是synchronized原理。
    • sleep,无法知道睡多久?
    • park,相当于wait–一同样无法知道什么时候唤醒
      其实我们的目标是把线程的停在lock方法里,不让他出来,这样就达到阻塞的效果了。这时就可以使用自旋,即for(;;){}
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      public class CustomLock{

      public void lock(){
      //this.wait()
      //sleep
      //park
      //所有线程进行循环,只有拿到锁的线程才可以跳出for循环
      for(;;){

      }
      }
      }

    – 自旋不是一直占用CPU吗?怎么让渡CPU呢?

  2. 怎么设计互斥锁,任意时刻只有一个人能执行成功,要求原子性。这时就需要用到CAS操作,由操作系统提供的方法。

    • 首先需要定义一个状态status,获取锁的过程,也就是把status修改为1的过程。
    • 通过CAS操作进行status的竞争
    • 另外需要把当前线程,也就是当前获取锁的线程保存一下
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public boolean acquire(){
      Thread t = Thread.currentThread();
      if(compareAndSwapState(0,1)){
      setLockHolder(t);
      return true;
      }
      return false;
      }


      public void lock(){
      if(acquire()){
      return ;
      }

      for(;;){
      if(acquire()){//在循环里竞争锁
      break;
      }
      }
      }
  3. 由于for(;;)中,一直占用CPU,这很浪费资源。有什么办法呢? 这个时候其实可以修改线程状态,通过LockSupport.park(),永久park。通过unpark解锁,也就需要定义个unlock方法。
    如果使用wait,唤醒是随即的,不通达到公平锁的要求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
      public void lock(){
    if(acquire()){
    return ;
    }

    for(;;){
    if(acquire()){//在循环里竞争锁
    break;
    }

    LockSupport.park()

    }

    public void unlock(){
    LockSupport.unpark()
    }
    }
  4. 由于unpark的线程时,需要确定的是哪个线程,所以当多个线程在竞争锁时,如果没拿到锁,那个就都需要保存到一个队列里,等待上一个线程释放锁时唤醒。但是这个队列不能使用阻塞队列,因为阻塞队列也是基于AQS实现的。这里使用ConcurrentLinkedQueue,线程安全,但不是阻塞的。
    示意图

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    ConcurrentLinkedQueue<Thread> aiters = new ConcurrentLinkedQueue();


    public void lock(){
    if(acquire()){
    return ;
    }

    Thread current = Thread.currentThread();
    waiters.add(current);

    for(;;){
    if(acquire()){//在循环里竞争锁
    break;
    }

    LockSupport.park()

    }

    public void unlock(){
    LockSupport.unpark()
    }
    }

    6. 再看下解锁
    * 首先判断下释放锁的线程是不是保存的获取锁的线程
    * 通过CAS把锁的状态修改为0,即解锁,同时要把线程holder置空,并且从等待队列中取一个线程进行解锁
    ```JAVA

    public void unlock(){
    if(Thread.currentThread != lockHolder){
    throw new RuntimeExcption("不是当前线程持有的锁");
    }

    int state = getState();
    if(compareAndSwapState(state,0)){
    setLockHolder(null);
    Thread t = waiters.peek();
    if (t != null){
    LockSupport.unpark(t);
    }
    }

    }
  5. 这里再看lock的过程,在unlock方法中已经把队列中等待的第一个线程unpark了。所以在lock方法中,在acquire成功时,要移除这个线程,并return。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24


    public void lock(){
    if(acquire()){
    return ;
    }

    Thread current = Thread.currentThread();
    waiters.add(current);

    for(;;){
    /* if(acquire()){//在循环里竞争锁
    break;
    } */

    if(current == waiters.peek() && acquire()){
    waiters.poll();
    return;
    }


    LockSupport.park()

    }
  6. 还有个问题,如果队列不是空的,那么进来的线程应该去排队,而不是直接获取锁。由于acquire方法初始了在lock一开始执行,还在for(;;)中执行,这个时候假如T2在获取锁时,由于他是在队列里的,这个时候判断waiters==0永远不会成功

    1
    2
    3
    4
    5
    6
    7
    8
    public boolean acquire(){
    Thread t = Thread.currentThread();
    if((waiters.size == 0 || current == waiters.peek()) && compareAndSwapState(0,1)){
    setLockHolder(t);
    return true;
    }
    return false;
    }
  7. 另外我们补充一下park方法获取的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public final boolean compareAndSwapState(int except, int update){
    return unsafe.compareAndSwapInt(this,stateOffset, except, update);
    }

    private static final Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();

    private static final long stateOffset;

    static{
    try{
    stateOffset = unsafe.objectFieldOffset(CustomLock.class.getDeclareField("state"))
    }catch(Exception e){
    throw new Error();
    }
    }
  8. 如果LockSupport.park执行时,线程被中断了怎么办?t.interrupt()—-这块比较复杂,可再研究

  9. 总结
    AQS = 自旋 + CAS + LockSupport
    AQL具备:阻塞等待队列、共享/独占、公平/非公司、可重入、允许中断