JAVA基础篇面试题
文章目录
1. 线程等待唤醒的实现方法
-
Object对象中的wait()方法可以让线程等待,使用Object中的notify()方法唤醒线程;
- 必须都在同步代码块内使用;
- 调用wait,notify的对象是加锁对象;
- notify必须在wait后执行才能唤醒;
- wait后能释放锁对象,线程处于wait状态;
-
使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程;
- 必须在lock同步代码块内使用;
- signal必须在await后执行才能唤醒;
-
LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程;
- 不需要锁块;
- unpark()可以在park()前唤醒;
2. 介绍一下LockSupport
概念:用于创建锁和其他同步类的基本线程阻塞原语;是线程等待唤醒的升级版本;他是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞后也有对应的唤醒方法。
特点:无需在加锁环境中;park()和unpark()的作用分别是阻塞线程和解除线程;可先唤醒后等待;
原理:LockSupport使用了一种许可证机制(Permit)来实现阻塞和唤醒,每个线程都有一个许可证(Permit);其中,许可证只有两个值,1和0;默认是0;我们可以将许可证视为信号量,只是许可证不能超过1;
常用方法:
park():permit默认为0,一旦调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park()方法会被唤醒;然后会将permit重新设置为0后返回。
//LockSupport
public static void park() {
UNSAFE.park(false, 0L);
}
// Unsafe
public native void park(boolean var1, long var2);
unpark(thread): 会将线程的许可证设置为1(多次调用unpark,许可证不会累加)并自动唤醒park()的等待线程,被阻塞的线程将立即返回;
//LockSupport
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
// Unsafe
public native void unpark(Object var1);
常见问题:
LockSupport为什么可以先唤醒线程后阻塞线程但不会阻塞?
答:因为unpark()方法获得了一个许可证,许可证值为1,再调用park()方法,就可消费这个许可证,所以不会阻塞;
为什么唤醒两次后阻塞两次,最终还是会阻塞?
答:如果线程A调用两遍park(),线程B调用两边unpark(),那么只会解锁一个park(),因为许可证最多只能为1,不能累加;
3. AQS是什么
AQS抽象的队列同步器,是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型的变量表示持有锁的状态;当有线程获取不到锁时,就将线程加入该队列中,通过CAS自旋和LockSupport.part()的方式,维护state变量,达到并发同步的效果。
其中,
ReentranLock
,CountDownLatch
,ReentrantReadWriteLock
,Semaphore
底层都是AQS;
在JUC包下面,有三个相似的类:
AbstractQueuedSynchronizer;
AbstractQueuedLongSynchronizer;
AbstractQueuedSynchronizer; // 这个就是我们常说的AQS
AQS的原理图如下:
其中:CLH是Craig,Landin andHagersten队列,是一个单向链表,AQS中的队列是CLH扩展的虚拟双向FIFO队列;
4. AQS的核心原理
原理:AQS使用一个violatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要抢占资源的线程封装成一个Node节点来完成锁的分配,通过CAS完成对state值的修改;
核心:state+CLH带头节点的双端队列
其内部属性与继承关系如下:
主要字段解析:
AbstractQueuedSynchronizer: AQS本身是由state和CLH双端队列组成,CLH原理是一个Node节点的双向循环链表
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 同步state成员变量,0表示无人占用锁,大于1表示锁被占用需要等待;
private volatile int state;
/*CLH队列
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*/
// 通过state自旋判断是否阻塞,阻塞的线程放入队列,尾部入队,头部出队
static final class Node{}
private transient volatile Node head;
private transient volatile Node tail;
}
AbstractQueuedSynchronizer.Node
static final class Node {
// 表示线程以共享的模式等待锁
static final Node SHARED = new Node();
// 表示线程以独占的方式等待锁
static final Node EXCLUSIVE = null;
//表示线程获取锁的请求已经取消了
static final int CANCELLED = 1;
//表示线程准备解锁
static final int SIGNAL = -1;
// 表示节点在等待队列红,节点线等待唤醒
static final int CONDITION = -2;
// 表当前节点线程处于共享模式,锁可以传递下去
static final int PROPAGATE = -3;
// 表示节点在队列中的状态
volatile int waitStatus;
volatile Node prev;// 前序指针
volatile Node next;// 后序指针
volatile Thread thread; // 当前节点的线程
Node nextWaiter;// 指向下一个处于Condition状态的节点
final Node predecessor();//一个方法,返回前序节点prev,没有的话抛出NPE(空指针异常)
}
5. 公平锁与非公平锁的区别
公平锁:多个线程按照线程调用lock()的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
- 优点:所有的线程都能得到资源,不会饿死在队列中。
- 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
- 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
- 缺点:这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。
底层代码:
// 非公平锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 公平锁
final void lock() {
acquire(1);
}
总结:我们发现,底层lock()方法,非公平锁多了一个compareAndSetState()
,其不公平的原因,就是线程调用lock()的时候,如果发现当前没人占用的时候,直接就抢占过来,否则就进入CLH队列,等待调用。
因此非公平锁就是每次调用lock的线程和CLH队列头节点后的第一个线程进行竞争,竞争成功,调用lock的线程抢到锁,否则就是头节点后的第一个线程抢占成功。公平锁的话,就是所有线程都进入CLH队列,然后FIFO抢占依次获得锁。
讲这个的目的就是为了后面讲加锁解锁原理的时候,不再解释为什么是依次获取锁而不是抢占;
6. 请讲述非公平锁加锁解锁的具体流程
场景:假设目前有3个线程:A,B,C,A先占有锁,然后B,C线程尝试获取锁,最后A解锁;
-
A去调用ReentrantLock.lock()方法
// sync是ReentrantLock的一个属性对象 public void lock() { sync.lock(); } private final Sync sync;// 内部静态类的实例对象 abstract static class Sync extends AbstractQueuedSynchronizer{}//使用了AQS
由于Sync是一个抽象类,我们具体调用的是NonfairSync类的lock()方法,他继承了Sync;
使用NonfairSync的原因是我们使用的就是非公平锁
public ReentrantLock() { sync = new NonfairSync(); }
这里面呢,我们先调用
compareAndSetState()
方法,他是AQS的方法,目的是把AQS的state设置为1,state如果为0,表示没有人占用,为1表示有人占用,初始时为0,因此A线程此刻是可以成功获取到锁的。后面就执行setExclusiveOwnerThread,他是
AbstractOwnableSynchronizer.class
的方法,目的是标记当前获取锁的线程是线程A。此时A线程获取锁结束。static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } }
原理图:
-
线程B调用ReentrantLock.lock()方法
此时我们直接进入非公平锁的lock()方法中,由于抢占锁失败,所以进入
acquire(1)
方法中;static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } }
acquire()方法是AQS的方法,我们先进入tryAcquire()方法;
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
tryAcquire()交由实现类NonfairSync非公平锁去实现,他先获取当前AQSstate的值,然后判断是否为0,如果为0表示没有人抢占,此刻他抢占,返回true,外层acquire()方法获取的是!tryAcquire(arg),因此抢占锁后就完事了;
不过此刻锁被A占用,因此线程B不能获取到锁,那就判断当前占有线程是不是自己,如果是的话,就自己增加state的值,这里意思就是可重入锁的概念,当获取了锁的线程再次lock(),其state值+1;
不过此刻线程B也不满足,因此返回false;
// 这里调用进入非公平锁的tryAcquire protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 具体代码在这里 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 若当前无其他线程抢占锁,则抢占; if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果已获取锁的线程再调用lock()则state值+1,这里就是可重入的原理 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 都不是则返回false return false; }
因此线程B就进入AQS的addWaiter()方法中,这个就是创建CLH队列的节点,首先就先new出来一个节点,由于当前CLH队列没有节点,因此进入enq()方法;
private Node addWaiter(Node mode) { // 创建一个节点 mode Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 刚开始tail是null,如果tail有值了就将node插入队尾; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若队列为空,则插入节点 enq(node); return node; }
enq插入节点是死循环,第一次循环,由于tail为空,他先创建一个空的node节点,作为头节点,此时waitStatus=0,然后将head指向该头节点,并将tail指针也指向head;
第二次循环,他将待插入node节点(线程B)的前置指针指向tail指向的节点(头节点),然后CAS将tail指向当前待插入节点(线程B),再让原来的tail指向的节点(头节点)的next域指向当前节点,这样就完成了节点(线程B)插入队尾,完成链式结构,跳出循环;
private Node enq(final Node node) { for (;;) { // 死循环 Node t = tail; if (t == null) { // 初始下tail为null,因此创建一个头节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t;// 第二次循环,队列不为空,就将该节点插入队尾 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
现在就要进入前面acquire的AQS.acquireQueued(node, 1)方法了,目前我们线程B已经再CLH队列的队尾了,acquireQueued()这里依旧是死循环。
第一次循环:首先predecessor()取出的就是前置节点,p就是链表中的头节点,然后进入判断,当前确实是头节点,然后再次尝试tryAcquire(),由于线程A并没有释放锁,因此,只能进入shouldParkAfterFailedAcquire()方法;
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取节点的前置节点,线程B获取到的是头节点 if (p == head && tryAcquire(arg)) {//由于线程A占用,尝试获取失败 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 线程B会进入这里 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
AQS.shouldParkAfterFailedAcquire(头节点,当前节点),由于头节点的waitStatus等于0,因此这里最终将头节点的waitStatus设置成-1,并返回false;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;// 头节点的waitStatus=0 if (ws == Node.SIGNAL)// -1 return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 将头节点的waitStatus设置成-1 } return false; }
然后进行第二次循环,再次进入shouldParkAfterFailedAcquire(),这一次由于ws=-1,因此返回true,并进入parkAndCheckInterrupt()方法;这里会调用LockSupport.park()将线程挂起,此刻线程B就阻塞再这里了。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
-
线程C调用ReentrantLock.lock()方法
这里和B的步骤一样,我们直接进入关键步骤,acquire()方法是AQS的方法,由于A没有释放锁因此tryAcquire返回false,取反后为true,继续执行addWaiter()方法;
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
此刻addWaiter()方法的执行和线程B就有区别了,因为CLH队列有节点了,他直接将创建好的Node插入队尾并返回;
private Node addWaiter(Node mode) { // 创建一个节点 线程C Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 此时队列右节点,进入if if (pred != null) { // 插入线程C节点到队尾并返回 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 线程C不走这里 enq(node); return node; }
现在就要进入acquireQueued()方法了,开始死循环。
第一次循环:这里由于线程C的前置节点不是头节点,因此直接进入shouldParkAfterFailedAcquire()将线程B的状态改为-1;这里线程C把线程B节点的status改为了-1;
第二次循环:和B一样,LockSupport.park()将线程挂起,此刻线程B就阻塞在parkAndCheckInterrupt()中。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取节点的前置节点,线程C获取到的是线程B if (p == head && tryAcquire(arg)) {// 所以条件不满足 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 线程C会进入这里 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
-
A线程调用unlock()解锁
这里呢,线程A就先去获取AQS的state,并对应减去1个,并设置当前占有线程为null,然后找到头节点去调用unparkSuccessor(head);
// 执行ReentrantLock.unlock() public void unlock() { sync.release(1); } // AQS.release() public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 执行ReentrantLock.tryRelease() protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
此刻进入unparkSuccessor(头节点)中,他将头节点的状态从-1设置为0,然后唤醒线程B;
private void unparkSuccessor(Node node) { int ws = node.waitStatus;// 头节点是-1 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);// 头节点设置为0 Node s = node.next;// 线程B if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒线程B }
此时A线程解锁完成,唤醒线程B。线程B还锁在acquireQueued方法的parkAndCheckInterrupt()方法中。
解锁后开始第三次循环;
第三次循环发现前置节点是头,且可以占用锁,因此线程B获取到锁并进入第一个if;然后重新设置头节点,此处调用的setHead(线程B节点),将头指向线程B,将原头节点剔除队列,然后将线程B设置成头节点。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取节点的前置节点,线程B获取到的是头节点 if (p == head && tryAcquire(arg)) {//目前锁无占用,进入此处 setHead(node); // 重新设置头节点 p.next = null; // help GC failed = false; return interrupted; // 被改为true } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 线程B从这里唤醒 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 修改头节点 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
此时原头节点被GC回收,线程B获取到了锁,然后线程B节点变成了头节点。之后的流程就是如此循环往复。AQS主体流程结束。