今天看啥  ›  专栏  ›  克里斯朵夫李维

并发相关知识查漏补缺(京东二面)

克里斯朵夫李维  · 掘金  ·  · 2020-06-22 08:30
阅读 74

并发相关知识查漏补缺(京东二面)

京东二面的面试官看我写了熟悉并发包后问我熟悉并发包的哪些东西,我说了AQS和Sychronized,CAS等,然后他问了我很多并发的细节问题,基本上全是源码相关的。比如AQS在获取同步状态失败后在同步队列是如何处理。使用的是什么方法挂起线程,很细节。CAS的实现原理,到CPU层次的。ConcurrentHashMap和HashMap的区别,他是怎么扩容的?(Java面试8股文?哈哈)

如何使用Unsafe类

Unsafe类只能被Bootstrap类加载器加载,要使用它有两种方式:

  • 通过Java命令行命令-Xbootclasspath/a把调用Unsafe相关方法的类A所在jar包路径追加到默认的bootstrap路径中
  • 使用反射
     static {
          try {
              Field field = Unsafe.class.getDeclaredField("theUnsafe");
              field.setAccessible(true);
              U = (Unsafe) field.get(null);
              System.out.println(U);
              VALUE = U.objectFieldOffset(UnsafeOperation.class.getDeclaredField("value"));
              System.out.println(VALUE);
          } catch (Exception e) {
          }
      }
    复制代码

Unsafe的comapreAndSwap方法

comapreAndSwap(包含要修改field的对象,field的内存地址偏移量,期望值,更新值)

CAS实现原理

旧的预期值A,更新值B,内存值V。compareAndSet(expect,update)

基于Unsafe类的ComaperAndSwapXX()

基于cpu的cmpxchg指令(比较并交换指令)

AQS怎么实现的?

答:通过一个volatile int state的同步状态和一个双向链表的阻塞队列实现的。同步队列是一个双向链表,节点是Node。Node里的属性有Thread,代表当前来获取锁的线程。还有一个volatile int waitStatus;用来控制线程的阻塞和唤醒。

AQS的加锁流程

  • Step1
 static final class NonfairSync extends Sync {
       private static final long serialVersionUID = 7316153563782823691L;

       /**
        * Performs lock.  Try immediate barge, backing up to normal
        * acquire on failure.
        */
       final void lock() {
           if (compareAndSetState(0, 1))
               setExclusiveOwnerThread(Thread.currentThread());
           else
               acquire(1);
       }

       protected final boolean tryAcquire(int acquires) {
           return nonfairTryAcquire(acquires);
       }
   }
  
复制代码
  • Step2
public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }
复制代码
  • Step3
private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       Node pred = tail;
       if (pred != null) {
           node.prev = pred;
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
           }
       }
       enq(node);
       return node;
   }
复制代码
  • Step4
 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}
复制代码
  • Step5
 final boolean acquireQueued(final Node node, int arg) {
	// 标记是否成功拿到资源
	boolean failed = true;
	try {
		// 标记等待过程中是否中断过
		boolean interrupted = false;
		// 开始自旋,要么获取锁,要么中断
		for (;;) {
			// 获取当前节点的前驱节点
			final Node p = node.predecessor();
			// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
			if (p == head && tryAcquire(arg)) {
				// 获取锁成功,头指针移动到当前node
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
复制代码
  • Setp6
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return fale;
    }
复制代码
  1. 首先使用compareAndSetState(0,1)获取同步状态,获取成功后通过 setExclusiveOwnerThread(Thread.currentThread())将当前线程设置为锁的独占线程。否则进入第2步。

  2. 调用acquire(1),通过tryAcquire(1)获取通过状态。如果获取失败,则通过addWaiter(Node.EXCLUSIVE))将当前线程构造为Node,添加到同步队列尾部。

  3. addWaiter方法会通过当前线程创建一个Node。然后把当前线程的Node通过compareAndSetTail(Node)设置到队列尾部,此时有两种情况:

    1. 如果此时队列还没有初始化需要先进行队列的初始化过程。进行第4步,创建Head和Tail节点。
    2. 如果此时Tail节点已经被别的线程修改,也需要进行第4步。
  4. 队列的初始化,会创建一个虚拟节点,他的Thread属性为空。然后通过compareAndSetHead设置Head节点和初始化Tail节点。最终通过自旋操作,把当前线程的Node,通过comapereAndSetTail(Node)设置到队列尾部。然后返回当前节点。

  5. 然后把当前传给acquireQueued方法,acquireQueued中有个自旋方法,会不断的获取当前的前驱节点,如果当前节点的前驱节点是Head节点,则当前线程使用tryAcquire()尝试获取同步状态。获取成功的话则调用setHead(Node),将当前的节点设置为Head节点,并把Thread属性设置为null,(因为头节点是虚拟节点)。否则进行第6步。

  6. shouldParkAfterFailedAcquire方法,会检查当前节点的前驱节点的waitStatus,(初始化时waitStatus是0),如果waitStatusSIGNAL(-1)则直接返回true。如果waitStatus>0,循环向前处理撤销节点。否则通过compareAndSetWaitStatus把前驱节点的waitStatus设置为-1。然后再次进行循环的时候,就会通过LockSupport.park(this)把当前线程阻塞,这也是为什么使用AQS的时候线程的状态是waiting

AQS释放锁流程

  • Step1
 public final boolean release(int arg) {
        // 如果当前锁没有线程持有
        if (tryRelease(arg)) {
            Node h = head;
            // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}
复制代码
  • Step2
// 返回当前锁是否有线程持有
protected final boolean tryRelease(int releases) {
            // 减少可重入次数
            int c = getState() - releases;
            // 当前线程不是持有锁的线程,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            // 设置state
            setState(c);
            return free;
}
复制代码
  • Step3
     private void unparkSuccessor(Node node) {
        // 获取头结点waitStatus
        int ws = node.waitStatus;
        // 设置头节点的状态为0,表示后继节点运行
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 获取后继节点
        Node s = node.next;
        // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码
  1. 释放锁的时候首先会调用tryRelease(1),通过getState()获取同步状态的值,然后减少可重入次数,还要判断下释放锁的线程是不是独占锁线程,不是的话就抛出异常。如果同步状态减为0,则把独占锁线程设置为null,并返回true。表示其他线程可以获取锁了。执行第2步。
  2. 当tryRelease(1)返回true的时候,先获取头节点,判断头节点的waitstatus==0?,如果等于0表示它的后继节点正在运行,不需要唤醒。如果为不等于0表示后继节点是需要唤醒的。执行第3步。
  3. unparkSuccessor,首先通过compareAndSetWaitStatus把头节点的状态设置为0,表示后继节点可以运行。然后获取后继节点,接着判断后继节点是不是被撤销了(waitStatus==1),如果被撤销则从队尾找到第一个waitStatus==1 的节点,然后通过LockSupport.unpark(s.thread)唤醒线程。

AQS如何实现锁的可重入

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取同步状态
            int c = getState();
            // 表示同步锁被释放了
            if (c == 0) {
                // 设置同步状态
                if (compareAndSetState(0, acquires)) {
                    // 设置当前线程为独占线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 判断当前线程是不是独占线程
            else if (current == getExclusiveOwnerThread()) {
                // 增加可重入次数
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置同步状态
                setState(nextc);
                return true;
            }
            // 返回获取同步状态失败
            return false;
}
复制代码

AQS实现可重入锁的方式就是实现它的模版方法tryAcquire(1),首先通过getState获取同步状态

  • 如果同步状态的值是0表示这个同步锁已经被释放了,那么则需要通过compareAndSetState设置同步状态,并把当前线程设置为锁的独占线程。
  • 如果不为0,则判断当前线程是不是锁的独占线程,如果是增加state的值(也就是锁的可重入次数),更新同步状态返回true,否则返回false。

公平锁实现

public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
复制代码

hasQueuedPredecessors方法:

  • 返回false代表同步队列中没有有效的节点,代表当前线程可以获取同步状态。
  • 返回true代表同步队列中存在有效节点且当前线程和Head节点的后继节点不是一个线程,则无法获取锁,必须加入同步队列等待。

ConcurrentHashMap和HashMap的区别

  • 区别点1:初始化table的方式不同

    不论HashMap还是ConcurrentHashMap(下面简写为ccMap)都是在put操作的时候才会初始化table,但是HashMap不是线程安全的,所以它的table在并发情况下可能初始化多次。但是ConcurrentHashMap只会初始化table一次。

    private final Node<K,V>[] initTable() {
          Node<K,V>[] tab; int sc;
          while ((tab = table) == null || tab.length == 0) {
              // sizeCtl小于0,当前线程让出CPU使用权
              // 全局变量局部化
              if ((sc = sizeCtl) < 0)
                  Thread.yield(); // lost initialization race; just spin
              // CAS设置 SIZECTL值为1保证只有一个table只初始化一次   
              else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                  try {
                      if ((tab = table) == null || tab.length == 0) {
                          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                          @SuppressWarnings("unchecked")
                          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                          table = tab = nt;
                          sc = n - (n >>> 2);
                      }
                  } finally {
                      // 更新全局变量 
                      sizeCtl = sc;
                  }
                  break;
              }
          }
          return tab;
      }
    复制代码

ccMap的table属性是被volatile关键字修饰的,但是HashMap不是。ccMap利用一个被voliate修饰sizeCtl和CAS操作保证table在并发下只被初始化一次。

这里有个小细节就是关于被volatile关键字修饰的变量怎么使用的问题,一般情况下我们会把被volatile关键字修饰变量本地化,这样做有两个好处:

  1. 因为volatile关键字修饰的变量会保证内存的可见性,如果我们在一个方法中多次读取一个被volatile关键字修饰的变量那么和可能会导致读取的数据不一致,因为它可能被另一个线程修改了。本地化后不会出现这个情况。
  2. 在一个方法内多次修改volatile修饰的变量,会多次刷新到主存。影响性能。本地化变量后只在最后,改变volatile修饰的变量可以保证性能。

如果我们能额外答出这一点,会让面试官感觉到有点东西。。

  • 区别点二:获取table[i]的方式不同

    虽然ccMap使用的table被voliate修饰,但是由于JVM的内存模型,在并发下不能保证每个元素都是最新的所以通过unsafe.getObjectVolatile获取最新的元素

  • 区别点三:插入方式

    1. 如果table[i]==null,ccMap使用CAS插入f=node,插入成功则调用addCount检查是否需要扩容。
    2. 插入失败说明有别的线程在这个位置插入成功,然后自旋在这个位置插入节点。
    3. 如果f的hash为-1,说明有其他线程在扩容,则一起进行扩容。
    4. 其他情况下采用sychronized对f进行加锁,进行插入。这个流程和HashMap没有什么太大区别。
  • 区别点四:扩容

    HashMap扩容时机为Size * LoadfaFctor > Capticaty

    ccMap扩容时机为

    1. 某个槽内的链表长度达到 8,但是数组长度小于 64 先扩充数组大小。
    2. Size * LoadfaFctor > Capticaty

从ReentrantLock的实现看AQS的原理及应用

Java魔法类:Unsafe应用解析




原文地址:访问原文地址
快照地址: 访问文章快照