Java 线程间通信 —— 等待 / 通知机制

2年前 (2022) 程序员胖胖胖虎阿
277 0 0

本文部分摘自《Java 并发编程的艺术》

volatile 和 synchronize 关键字

每个处于运行状态的线程,如果仅仅是孤立地运行,那么它产生的作用很小,如果多个线程能够相互配合完成工作,则将带来更大的价值

Java 支持多个线程同时访问一个对象或者对象的成员变量,使用 volatile 关键字可以保证被修饰变量的可见性,意味着任一线程对该变量的任何修改,其他线程都可以立即感知到

synchronize 关键字可以修饰方法或者同步块,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。synchronize 关键字的实现,本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由 synchronize 所保护对象的监视器

任何一个对象都拥有自己的监视器,任意一个线程对 Object 的访问(Object 由 synchronize 保护)的访问,首先要获得 Object 的监视器。如果获取失败,线程进入同步队列,线程状态变为 BLOCKED。当访问 Object 的前驱(获得了锁的线程)释放了锁,则该释放操作将唤醒阻塞在同步队列中的线程,使其重新尝试获取监视器

Java 线程间通信 —— 等待 / 通知机制

等待 - 通知机制

一个线程修改了一个对象的值,另一个线程感知到变化,然后进行相应的操作,前者是生产者,后者是消费者,这种通信方式实现了解耦,更具伸缩性。在 Java 中为了实现类似的功能,我们可以让消费者线程不断地循环检查变量是否符合预期,条件满足则退出循环,从而完成消费者的工作

while(value != desire) {
    Thread.sleep(1000);
}
doSomething();

睡眠一段时间的目的是防止过快的无效尝试,这种实现方式看似能满足需求,但存在两个问题:

  • 难以确保及时性

    如果睡眠时间太长,就难以及时发现条件已经变化

  • 难以降低开销

    如果降低睡眠时间,又会消耗更多的处理器资源

使用 Java 提供了内置的等待 - 通知机制能够很好地解决上述问题,等待 - 通知的相关方法是任意 Java 对象都具备的

方法名称 描述
notify() 通知一个在对象上等待的线程,使其从 wait() 方法返回,返回的前提是该线程获取到了对象的锁
notifyAll() 通知所有等待在该对象上的线程
wait() 调用该方法的线程进入 WAITING 状态,只有等待另外的线程通知或被中断才返回,调用此方法会释放对象的锁
wait(long) 超时等待一段时间,参数时间是毫秒
wait(long, int) 对于超时时间更细粒度的控制,可以达到纳秒

等待 - 通知机制,是指一个线程 A 调用了对象 O 的 wait() 方法进入等待状态,而另一个线程 B 调用了对象 O 的 notify() 或者 notifyAll() 方法,线程 A 收到通知后从对象 O 的 wait() 方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait() 和 notify/notifyAll() 的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作

下述例子中,创建两个线程 WaitThread 和 NotifyThread,前者检查 flag 值是否为 false,如果符合要求,进行后续操作,否则在 lock 上等待,后者在睡眠一段时间后对 lock 进行通知

public class WaitNotify {

    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {

        @Override
        public void run() {
            // 加锁,拥有 lock 的 Monitor
            synchronized (lock) {
                // 继续 wait,同时释放 lock 的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + "flag is true. wait @ "
                                + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 完成工作
                System.out.println(Thread.currentThread() + "flag is false. running @ "
                    + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {

        @Override
        public void run() {
            // 加锁,拥有 lock 的 Monitor
            synchronized (lock) {
                // 获取 lock 的锁,然后进行通知,通知时不会释放 lock 的锁
                // 直到当前线程释放 lock 后,WaitThread 才能从 wait 方法中返回
                System.out.println(Thread.currentThread() + " hold lock. notify @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
            // 再次加锁
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}

运行结果如下

Java 线程间通信 —— 等待 / 通知机制

上述结果的第三行和第四行顺序可能会互换,下面简单描述一下代码的执行过程

  1. WaitThread 线程先启动,NotifyThread 线程后启动,由于中间有睡眠一秒的操作,所以 WaitThread 线程首先获得锁
  2. WaitThread 线程循环判断条件是否满足,不满足则调用执行 lock.wait() 方法,释放 lock 对象上的锁,进入 lock 对象的等待队列中,进入等待状态
  3. 由于 WaitThread 线程释放了锁,所以 NotifyThread 获得 lock 对象上的锁,执行 lock.notifyAll() 方法,但并不会立即释放锁,只是通知所有等待在 lock 上的线程可以参与竞争锁了(notify 也同理),并把 flag 设为 false,本段代码执行结束,NotifyThread 线程释放锁,此时 WaitThread 线程和 NotifyThread 线程共同竞争 lock 的锁
  4. 无论谁先拿到锁,WaitThread 线程和 NotifyThread 线程都能顺利完成任务

等待 - 通知机制的经典范式

从上节的内容中,我们可以提炼出等待 - 通知机制的经典范式,该范式分为两部分,分别针对等待方(消费方)和通知方(生产者)

等待方遵循如下原则:

  • 获取对象上的锁
  • 如果条件不满足,调用对象的 wait() 方法,被通知后仍要检查条件
  • 条件满足则执行对应的逻辑

伪代码如下:

synchronized(对象) {
	while(条件不满足) {
    	对象.wait();
    }
    对应的处理逻辑
}

通知方遵循如下原则:

  • 获取对象上的锁
  • 改变条件
  • 通知所有等待在对象上的线程

伪代码如下:

synchronized(对象) {
	改变条件
    对象.notifyAll();
}

Thread.join() 的使用

如果一个线程 A 执行了 thread.join() 语句,其含义是:当前线程 A 等待 thread 线程终止之后才从 thread.join() 返回

下面的例子中,创建 10 个线程,编号为 0 ~ 9,每个线程线程调用前一个线程的 join() 方法,也就是线程 0 结束了,线程 1 才能从 join() 方法中返回,而线程 0 需要等待 main 线程结束

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }

    static class Domino implements Runnable {

        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}

输出如下

Java 线程间通信 —— 等待 / 通知机制

从上述输出可以看到,每个线程等待前驱线程终止后,才从 join() 方法返回,这里涉及了等待 - 通知机制(等待前驱线程结束,接收前驱线程结束通知)

Thread.join() 源码简化如下

public final synchronized void join(long millis) throws InterruptedException {
    	// 省略前面代码...
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        }
    	// 省略后面代码...
    }

假设当前线程是 main 线程,main 线程调用 thread.join() 方法,则 main 线程会获取 thread 线程对象上的锁,并循环判断 thread 线程是否还存活,如果存活,调用 wait 方法释放锁并等待,如果 thread 线程已经结束,则 thread 线程会自动调用自身 notifyAll() 方法通知所有等待在自身线程对象上的线程,则 main 线程可以继续执行后续操作

版权声明:程序员胖胖胖虎阿 发表于 2022年9月24日 上午12:32。
转载请注明:Java 线程间通信 —— 等待 / 通知机制 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...