本文部分摘自《Java 并发编程的艺术》
volatile 和 synchronize 关键字
每个处于运行状态的线程,如果仅仅是孤立地运行,那么它产生的作用很小,如果多个线程能够相互配合完成工作,则将带来更大的价值
Java 支持多个线程同时访问一个对象或者对象的成员变量,使用 volatile 关键字可以保证被修饰变量的可见性,意味着任一线程对该变量的任何修改,其他线程都可以立即感知到
synchronize 关键字可以修饰方法或者同步块,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。synchronize 关键字的实现,本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由 synchronize 所保护对象的监视器
任何一个对象都拥有自己的监视器,任意一个线程对 Object 的访问(Object 由 synchronize 保护)的访问,首先要获得 Object 的监视器。如果获取失败,线程进入同步队列,线程状态变为 BLOCKED。当访问 Object 的前驱(获得了锁的线程)释放了锁,则该释放操作将唤醒阻塞在同步队列中的线程,使其重新尝试获取监视器
等待 - 通知机制
一个线程修改了一个对象的值,另一个线程感知到变化,然后进行相应的操作,前者是生产者,后者是消费者,这种通信方式实现了解耦,更具伸缩性。在 Java 中为了实现类似的功能,我们可以让消费者线程不断地循环检查变量是否符合预期,条件满足则退出循环,从而完成消费者的工作
while(value != desire) {
Thread.sleep(1000);
}
doSomething();
睡眠一段时间的目的是防止过快的无效尝试,这种实现方式看似能满足需求,但存在两个问题:
-
难以确保及时性
如果睡眠时间太长,就难以及时发现条件已经变化
-
难以降低开销
如果降低睡眠时间,又会消耗更多的处理器资源
使用 Java 提供了内置的等待 - 通知机制能够很好地解决上述问题,等待 - 通知的相关方法是任意 Java 对象都具备的
方法名称 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从 wait() 方法返回,返回的前提是该线程获取到了对象的锁 |
notifyAll() | 通知所有等待在该对象上的线程 |
wait() | 调用该方法的线程进入 WAITING 状态,只有等待另外的线程通知或被中断才返回,调用此方法会释放对象的锁 |
wait(long) | 超时等待一段时间,参数时间是毫秒 |
wait(long, int) | 对于超时时间更细粒度的控制,可以达到纳秒 |
等待 - 通知机制,是指一个线程 A 调用了对象 O 的 wait() 方法进入等待状态,而另一个线程 B 调用了对象 O 的 notify() 或者 notifyAll() 方法,线程 A 收到通知后从对象 O 的 wait() 方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait() 和 notify/notifyAll() 的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作
下述例子中,创建两个线程 WaitThread 和 NotifyThread,前者检查 flag 值是否为 false,如果符合要求,进行后续操作,否则在 lock 上等待,后者在睡眠一段时间后对 lock 进行通知
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
@Override
public void run() {
// 加锁,拥有 lock 的 Monitor
synchronized (lock) {
// 继续 wait,同时释放 lock 的锁
while (flag) {
try {
System.out.println(Thread.currentThread() + "flag is true. wait @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 完成工作
System.out.println(Thread.currentThread() + "flag is false. running @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
@Override
public void run() {
// 加锁,拥有 lock 的 Monitor
synchronized (lock) {
// 获取 lock 的锁,然后进行通知,通知时不会释放 lock 的锁
// 直到当前线程释放 lock 后,WaitThread 才能从 wait 方法中返回
System.out.println(Thread.currentThread() + " hold lock. notify @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加锁
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}
运行结果如下
上述结果的第三行和第四行顺序可能会互换,下面简单描述一下代码的执行过程
- WaitThread 线程先启动,NotifyThread 线程后启动,由于中间有睡眠一秒的操作,所以 WaitThread 线程首先获得锁
- WaitThread 线程循环判断条件是否满足,不满足则调用执行 lock.wait() 方法,释放 lock 对象上的锁,进入 lock 对象的等待队列中,进入等待状态
- 由于 WaitThread 线程释放了锁,所以 NotifyThread 获得 lock 对象上的锁,执行 lock.notifyAll() 方法,但并不会立即释放锁,只是通知所有等待在 lock 上的线程可以参与竞争锁了(notify 也同理),并把 flag 设为 false,本段代码执行结束,NotifyThread 线程释放锁,此时 WaitThread 线程和 NotifyThread 线程共同竞争 lock 的锁
- 无论谁先拿到锁,WaitThread 线程和 NotifyThread 线程都能顺利完成任务
等待 - 通知机制的经典范式
从上节的内容中,我们可以提炼出等待 - 通知机制的经典范式,该范式分为两部分,分别针对等待方(消费方)和通知方(生产者)
等待方遵循如下原则:
- 获取对象上的锁
- 如果条件不满足,调用对象的 wait() 方法,被通知后仍要检查条件
- 条件满足则执行对应的逻辑
伪代码如下:
synchronized(对象) {
while(条件不满足) {
对象.wait();
}
对应的处理逻辑
}
通知方遵循如下原则:
- 获取对象上的锁
- 改变条件
- 通知所有等待在对象上的线程
伪代码如下:
synchronized(对象) {
改变条件
对象.notifyAll();
}
Thread.join() 的使用
如果一个线程 A 执行了 thread.join() 语句,其含义是:当前线程 A 等待 thread 线程终止之后才从 thread.join() 返回
下面的例子中,创建 10 个线程,编号为 0 ~ 9,每个线程线程调用前一个线程的 join() 方法,也就是线程 0 结束了,线程 1 才能从 join() 方法中返回,而线程 0 需要等待 main 线程结束
public class Join {
public static void main(String[] args) throws InterruptedException {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
// 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
Thread thread = new Thread(new Domino(previous), String.valueOf(i));
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " terminate.");
}
static class Domino implements Runnable {
private Thread thread;
public Domino(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " terminate.");
}
}
}
输出如下
从上述输出可以看到,每个线程等待前驱线程终止后,才从 join() 方法返回,这里涉及了等待 - 通知机制(等待前驱线程结束,接收前驱线程结束通知)
Thread.join() 源码简化如下
public final synchronized void join(long millis) throws InterruptedException {
// 省略前面代码...
if (millis == 0) {
while (isAlive()) {
wait(0);
}
}
// 省略后面代码...
}
假设当前线程是 main 线程,main 线程调用 thread.join() 方法,则 main 线程会获取 thread 线程对象上的锁,并循环判断 thread 线程是否还存活,如果存活,调用 wait 方法释放锁并等待,如果 thread 线程已经结束,则 thread 线程会自动调用自身 notifyAll() 方法通知所有等待在自身线程对象上的线程,则 main 线程可以继续执行后续操作