多线程与高并发基础

2年前 (2022) 程序员胖胖胖虎阿
319 0 0

多线程入门

基本概念

什么叫多线程技术?

从软件或硬件上实现多个线程并发执行的技术

并行与并发

  • 并行:同一时刻,多个指令在多个CPU上同时执行
  • 并发:同一时刻,多个指令在单个CPU上交替执行

进程和线程

  • 进程:运行中的程序就叫进程,进程有以下三个特性
    1. 独立性:是能独立运行的基本单位,也是系统分配资源和调度的独立单位
    2. 动态性:进程是程序的一次执行过程,进程是动态产出,动态消亡的
    3. 并发性:任何进程都可以和其它进程并发执行
  • 线程:是进程中的一条执行路径

单线程和多线程是啥意思?

  • 单线程:一个程序启动的一个进程如果只有一条执行路径,则是单线程程序
  • 多线程:一个程序启动的一个进程如果有多条执行路径,则是多线程程序

Java中多线程的三种实现方式

方式一:继承 Thread 类

步骤:

  • 自定义一个类 MyThread 继承 Thread 类

  • 在 MyThread 类中重写 run() 方法

  • 创建 MyThread 类的对象

  • 启动线程

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程……run");
    }
}

public class Test {
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start();
    }
}
  • 为什么要重写run()方法?

    因为 run() 是用来封装被线程执行的代码。main 方法中的代码是由主线程来执行的

  • run() 方法和 start() 方法的区别?

    run():封装线程执行的代码,如果直接调用,相当于普通方法的调用

    start():启动线程;然后由 JVM 调用此线程的 run() 方法

方式二:实现 Runnable 接口

步骤:

  • 定义一个类MyRunnable实现Runnable接口

  • 在MyRunnable类中重写run()方法

  • 创建MyRunnable类的对象

  • 创建Thread类的对象,把MyRunnable对象作为构造方法的参数。有两个构造方法:①Thread(Runnable target),②Thread(Runnable target, String name)  其中 name 参数是要给线程起的名字,如果不传,则程序会分配一个

  • 启动线程

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " run……");
        // Thread.currentThread().getName() 获取执行当前代码的线程名
    }
}

public class Test {
    public static void main(String[] args) {
        MyRunnable runnable = new MyRunnable();
        Thread thread = new Thread(runnable, "哈哈哈线程");
        thread.start();
    }
}

方式三:实现 Callable 接口

步骤:

  • 定义一个类MyCallable实现Callable接口,需要指定泛型

  • 在MyCallable类中重写call()方法,返回值跟上面泛型一致

  • 创建MyCallable类的对象

  • 把MyCallable对象作为构造方法的参数,创建Future的实现类FutureTask对象,传入泛型需要与 Callable 中的一致

  • 创建Thread类的对象,把FutureTask对象作为构造方法的参数,也可以传入线程名

  • 启动线程

  • 然后可以通过FutureTask对象调用get方法,就可以获取线程结束之后的结果。(get方法一定要放在start之后执行,因为它是获取线程结束之后的结果,如果线程还没有开启或结束,那么get会在那卡着一直等)

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        for (int i = 0; i < 100; i++) {
            System.out.println("哈哈哈……" + i);
        }
        return "笑了100次,乐疯了";
    }
}

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyCallable callable = new MyCallable();
        //可以获取线程执行完毕之后的结果.也可以作为参数传递给Thread对象
        //这里的泛型要和 MyCallable 中的泛型一致
        FutureTask<String> task = new FutureTask<>(callable);
        Thread thread = new Thread(task);
        thread.start();
        System.out.println(task.get());
    }
}

三种方式的对比

  • 实现Runnable、Callable接口

    • 好处: 扩展性强,实现该接口的同时还可以继承其他的类

    • 缺点: 编程相对复杂

  • 继承Thread类

    • 好处: 编程比较简单,可以直接使用Thread类中的方法

    • 缺点: 可扩展性较差,不能再继承其他的类

设置和获取线程名称

上面的三种方式中,方式二和方式三在创建线程时都可以传入线程名,但是如果是方式一的话则不能了,因为方式一是我们自己自定义的线程类,里面没有含参构造方法,我们必须添加含参构造方法,并在含参构造方法中显示的调用父类的含参构造方法才能给线程命名,代码实现如下:

public class MyThreadDemo {
    public static void main(String[] args) {
        MyThread my1 = new MyThread("哈哈哈线程);

        //static Thread currentThread() 返回对当前正在执行的线程对象的引用
        System.out.println(Thread.currentThread().getName());  // main
    }
}
class MyThread extends Thread {
    public MyThread() {}
    public MyThread(String name) {
        super(name);
    }

    @Override
    public void run() {
        System.out.println(getName()+" run……");
    }
}

还可以创建线程后 通过线程的 setName 方法设置线程名称

通过 getName() 获取当前线程的名字。

注意:在自定义线程类的 run() 方法中可以直接通过 getName() 来获取执行当前方法的线程,但是在 Runnable 实现类的 run() 方法 和 Callable 实现类的 call() 方法中的只能 先通过  Thread.currentThread() 获取到线程再调用 getName() 方法

线程休眠:sleep 方法

static void sleep(long millis)   填入的为毫秒数

作用:使当前正在执行的线程停留

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "---" + i);
        }
    }
}

注意点:当一个类或接口的方法中未抛出异常,那么他的子类或者实现类重写该方法时就不能抛出异常。如上面:的Runnable接口中的 run 方法就没有抛出异常,那么它的实现类中就不能抛出异常。所以上面 sleep 处的异常只能通过 try-catch 来处理

线程的优先级

线程调度有两种方式:

  • 分时调度模型:所有线程轮流使用 CPU 的使用权,平均分配每个线程占用 CPU 的时间片
  • 抢占式调度模型: 优先级越高抢到CPU的几率就越高

Java使用的线程调度方式是抢占式调度模型。相关的方法如下:

  • getPriority()    获取线程的优先级
  • setPriority(int newPriority)   设置线程的优先级。默认优先级是5;线程优先级的范围是:1-10

守护线程:daemon thread

线程分为:用户线程和守护线程(主线程也属于用户线程)

守护线程,是个服务线程,准确地来说就是服务用户线程。当用户线程都执行完毕退出了的时候,守护线程也就没必要存在了,那么守护线程也会退出

就比如垃圾回收线程就是典型的守护线程

通过  setDaemon(boolean on) 里面传入 true 可以将线程设为守护线程。

synchronized 同步

先看一个典型的卖票案例:

class SellTicket implements Runnable {
    private int ticket = 100;
    //在SellTicket类中重写run()方法实现卖票,代码步骤如下
    @Override
    public void run() {
        while (true) {
            if(ticket <= 0){
                    //卖完了
                    break;
                }else{
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ticket--;
                    System.out.println(Thread.currentThread().getName() + "在卖票,还剩下" + ticket + "张票");
                }
        }
    }
}
public class SellTicketDemo {
    public static void main(String[] args) {
        //创建SellTicket类的对象
        SellTicket st = new SellTicket();

        //创建三个Thread类的对象,把SellTicket对象作为构造方法的参数,并给出对应的窗口名称
        Thread t1 = new Thread(st,"窗口1");
        Thread t2 = new Thread(st,"窗口2");
        Thread t3 = new Thread(st,"窗口3");

        //启动线程
        t1.start();
        t2.start();
        t3.start();
    }
}

用上面这种方式创建的所有线程都共享同一份数据,但如果用第一种创建线程的方式,则要将数据用static修饰才能共享

卖票案例出现的问题:相同的票出现了多次、出现了负数的票

问题产生原因:线程执行的随机性导致的,可能在卖票过程中丢失cpu的执行权,导致出现线程安全问题

这里我们就可以通过 synchronized 来解决上面的问题

同步方法

修饰符 synchronized 返回值类型 方法名(方法参数) { 
    方法体;
}

锁对象是什么呢? this

class SellTicket implements Runnable {
    private int ticket = 100;
    //在SellTicket类中重写run()方法实现卖票,代码步骤如下
    @Override
    public void run() {
        synchronizedMethod();
    }

    public synchronized void synchronizedMethod() {
        while (true) {
            if(ticket <= 0){
                //卖完了
                break;
            }else{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ticket--;
                System.out.println(Thread.currentThread().getName() + "在卖票,还剩下" + ticket + "张票");
            }
        }
    }
}

public class Test {
    public static void main(String[] args) {
        SellTicket st = new SellTicket();
        Thread t1 = new Thread(st,"窗口1");
        Thread t2 = new Thread(st,"窗口2");
        Thread t3 = new Thread(st,"窗口3");
        t1.start();
        t2.start();
        t3.start();
    }
}

静态同步方法:

就是把synchronized关键字加到静态方法上

修饰符 static synchronized 返回值类型 方法名(方法参数) { 
    方法体;
}

注意:同步的静态方法的锁对象为:类名.class, 即 class对象

同步代码块

以下演示一种错误方法

public class Demo {
    public static void main(String[] args) {
        MyThread t1 = new MyThread("窗口1");
        MyThread t2 = new MyThread("窗口2");
        MyThread t3 = new MyThread("窗口3");
        t1.start();
        t2.start();
        t3.start();
    }
}

package thread.synchronize;

public class MyThread extends Thread {
    public MyThread() {}
    public MyThread(String name) {
        super(name);
    }

    private static int ticket = 100;  //-------------------------------这里要加 static

    @Override
    public void run() {
        while (true) {
            synchronized(this) {   //-------------------每个线程到来,this都是当前线程对象
                if(ticket <= 0){
                    break;
                }else{
                    try {
                      Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                  ticket--;
                    System.out.println(Thread.currentThread().getName() + "在卖票,还剩下" + ticket + "张票");
                }
            }
      }
    }
}

为啥上面的有问题?

因为你是用 继承 的方式创建自己的线程类,然后 new 了三个线程,每个线程过来的this都是线程自己本身,那这样的话锁对象不唯一,当然没法锁住。

如果用的是实现 Runnable 的方式的话那是没有问题的(前提是你创建线程是用的同一个 Runnable 对象),请看下面代码

public class Demo {
    public static void main(String[] args) {
        MyRunnable mr = new MyRunnable();
        Thread t1 = new Thread(mr,"窗口1");
        Thread t2 = new Thread(mr,"窗口2");
        Thread t3 = new Thread(mr,"窗口3");
    }
}


class MyRunnable implements Runnable {
    private int ticket = 100;
    @Override
    public void run() {
        while (true) {
            synchronized(this) {   //--------- 每个线程到来,this都是同一个 MyRunnable 对象
                if(ticket <= 0){
                    break;
                }else{
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ticket--;
                    System.out.println(Thread.currentThread().getName() + "在卖票,还剩下" + ticket + "张票");
                }
            }
        }
    }
}

原因:每个线程到来,this都是同一个 MyRunnable 对象,所有线程用的同一把锁才能锁住

同步的利与弊:

利:成功解决多线程的数据安全问题

弊:当线程很多时,因为每个线程都会去判断同步上的锁,这是很耗费资源的,无形中会降低程序的运行效率

Lock锁

虽然我们可以理解同步代码块和同步方法的锁对象问题,但是我们并没有直接看到在哪里加上了锁,在哪里释放了锁,为了更清晰的表达如何加锁和释放锁,JDK5以后提供了一个新的锁对象Lock

Lock是接口不能直接实例化,这里采用它的实现类 ReentrantLock 来实例化。相关方法:

  • ReentrantLock 构造方法,创建 ReentrantLock 实例对象

    方法名 说明
    ReentrantLock() 创建一个ReentrantLock的实例
  • ReentrantLock实例的 所相关的方法

    方法名 说明
    void lock() 获得锁
    void unlock() 释放锁
  • 加锁解锁方法
方法名 说明
newCondition() 创建一个 Condition 的实例,通过它可以调用 await、signal方法

例子:

class Ticket implements Runnable {
    //票的数量
    private int ticket = 100;
    private ReentrantLock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            //synchronized (obj){//多个线程必须使用同一把锁.
            try {
                lock.lock();
                if (ticket <= 0) {
                    //卖完了
                    break;
                } else {
                    Thread.sleep(100);
                    ticket--;
                    System.out.println(Thread.currentThread().getName() + "在卖票,还剩下" + ticket + "张票");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            // }
        }
    }
}

public class Demo {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();

        Thread t1 = new Thread(ticket);
        Thread t2 = new Thread(ticket);
        Thread t3 = new Thread(ticket);

        t1.setName("窗口一");
        t2.setName("窗口二");
        t3.setName("窗口三");

        t1.start();
        t2.start();
        t3.start();
    }
}

synchronized 和 Lock 的对比

以下分别从语法、功能、性能这三个层面将它们进行对比

  • 语法层面

    • synchronized 是关键字,源码在 jvm 中,用 c++ 语言实现

    • Lock 是接口,源码由 jdk 提供,用 java 语言实现

    • 使用 synchronized 时,退出同步代码块锁会自动释放,而使用 Lock 时,需要手动调用 unlock 方法释放锁(为了防止大家忘记释放锁,所以一般推荐使用 try-finally 然后在 finally 块中释放锁)

  • 功能层面

    • 二者均属于悲观锁、都具备基本的互斥、同步、锁重入功能

      • 互斥:多个线程抢同一把锁,只有一个能成功,其它线程都会失败并进入阻塞状态

      • 同步:多个线程同时运行,当某个线程运行到某处时发现需要其它线程的结果,那它就会等待其它线程把结果搞出来后再继续执行 (synchronized可通过wait notify来实现同步) (Lock 可以通过 await signal 这两个都是 condition类 中的方法)

      • 锁重入:持锁的线程可以给持锁的对象重复加锁

    • Lock 提供了许多 synchronized 不具备的功能,例如:

      • 获取等待状态

      • 公平锁

      • 可打断、可超时:Lock提供了等不到锁就打断它不然它继续等 及 一定时间内等不到则不再等待

      • 多条件变量

    • Lock 有适合不同场景的实现,如 ReentrantLock 是可重入锁, ReentrantReadWriteLock 适合读多写少的场景

  • 性能层面

    • 在没有竞争时,synchronized 做了很多优化,如偏向锁、轻量级锁,性能不错

    • 在竞争激烈时,Lock 的实现通常会提供更好的性能

关于 Lock 的公平锁:

  • 已经处在阻塞队列中的线程(不考虑超时)始终都是公平的,先进先出

  • 未处于阻塞队列中的线程来争抢锁,如果队列不为空,则老实到队尾等待

  • 非公平锁是指未处于阻塞队列中的线程来争抢锁,与队列头唤醒的线程去竞争,谁抢到算谁的

  • 公平锁会降低吞吐量,一般都不用

关于 Lock 条件变量 Condition:

  • ReentrantLock 中的条件变量功能类似于普通 synchronized 的 wait,notify,用在当线程获得锁后,发现条件不满足时,临时等待的链表结构

  • 与 synchronized 的等待集合不同之处在于,ReentrantLock 中的条件变量可以有多个,可以实现更精细的等待、唤醒控制

死锁问题

什么叫死锁:线程死锁是指由于两个或者多个线程互相持有对方所需要的资源,导致这些线程处于等待状态,无法前往执行

产生原因:同步嵌套

例子:(下面用到了 Lambda 表达式创建线程)

public class Demo {
    public static void main(String[] args) {
        Object objA = new Object();
        Object objB = new Object();

        new Thread(()->{
            while(true){
                synchronized (objA){
                    //线程一
                    synchronized (objB){
                        System.out.println("小康同学正在走路");
                    }
                }
            }
        }).start();

        new Thread(()->{
            while(true){
                synchronized (objB){
                    //线程二
                    synchronized (objA){
                        System.out.println("小薇同学正在走路");
                    }
                }
            }
        }).start();
    }
}

生产者与消费者

生产者消费者模式是一个十分经典的多线程协作的模式 ,在这种模式下,线程可以看作两类:

  • 生产者线程:用于生产数据
  • 消费者线程:用于消费数据

为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库

生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为

消费者只需要从共享数据区中去获取数据,并不需要关心生产者的行为

Object类的等待和唤醒方法(使用什么对象当做锁,那么就必须用这个对象去调用等待和唤醒的方法)

方法名 说明
void wait() 导致当前线程等待,直到另一个线程调用该对象的 notify()方法或 notifyAll()方法
void notify() 唤醒正在等待对象监视器的单个线程,如果有多个线程在等待,则随机唤醒一个
void notifyAll() 唤醒正在等待对象监视器的所有线程

案例:

生产者类(Cooker):实现Runnable接口,重写run()方法,设置线程任务

  1. 如果没有包子,生产包子,生产完后唤醒消费者消费包子

  2. 如果有包子,就进入等待状态

消费者类(Foodie):实现Runnable接口,重写 run() 方法,设置线程任务

  1. 如果有包子,就消费包子,消费完后唤醒生产者生产包子

  2. 如果没有包子就等待

public class Test {
    public static void main(String[] args) {
        Foodie f = new Foodie();
        Cooker c = new Cooker();
        f.start();
        c.start();
    }
}

class Desk {

    //定义一个标记
    //true 就表示桌子上有汉堡包的,此时允许吃货执行
    //false 就表示桌子上没有汉堡包的,此时允许厨师执行
    public static boolean flag = false;

    //锁对象
    public static final Object lock = new Object();
}

class Cooker extends Thread {
    @Override
    public void run() {
        while (true) {
            synchronized (Desk.lock) {
                if (!Desk.flag) {  //如果没有包子了则生产
                    System.out.println("厨师正在生产汉堡包");
                    Desk.flag = true;
                    Desk.lock.notifyAll();  // 做好后唤醒消费者吃
                } else {  // 如果还有包子则等待
                    try {
                        Desk.lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

class Foodie extends Thread {
    @Override
    public void run() {
        while (true) {
            synchronized (Desk.lock) {
                if (Desk.flag) { //如果有包子,则开吃
                    System.out.println("吃货在吃汉堡包");
                    Desk.flag = false;
                    Desk.lock.notifyAll(); //吃完后唤醒生产者生产
                } else { //如果没有就等待
                    try {
                        Desk.lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}

线程的状态

对于线程的状态,我们可能听过两个版本,一种是说有五种,另一种是说有六种,那到底谁说的才是对的呢?

其实这两种说法都是对的!!

只是他们的划分依据不一样而已,六种状态是Java中线程就是划分为六种,五种状态是根据操作系统层面划分的。

六种状态

多线程与高并发基础

六种状态分别是:新建、可运行、终结、阻塞、等待、有限等待

  • 新建

    • 当一个线程对象被创建,但还未调用 start 方法时处于新建状态

    • 此时未与操作系统底层线程关联

  • 可运行

    • 调用了 start 方法,就会由新建进入可运行

    • 此时与底层线程关联,由操作系统调度执行

  • 终结

    • 线程内代码已经执行完毕,由可运行进入终结

    • 此时会取消与底层线程关联

注意:新建→可运行→终结:是单向不可逆的过程

  • 阻塞

    • 当获取锁失败后,由可运行进入 Monitor 的阻塞队列阻塞,此时不占用 cpu 时间

    • 当持锁线程释放锁时,会按照一定规则唤醒阻塞队列中的阻塞线程,唤醒后的线程进入可运行状态

  • 等待

    • 当获取锁成功后,但由于条件不满足,调用了 wait() 方法,此时从可运行状态释放锁进入 Monitor 等待集合等待,同样不占用 cpu 时间

    • 当其它持锁线程调用 notify() 或 notifyAll() 方法,会按照一定规则唤醒等待集合中的等待线程,恢复为可运行状态

  • 有时限等待

    • 当获取锁成功后,但由于条件不满足,调用了 wait(long) 方法,此时从可运行状态释放锁进入 Monitor 等待集合进行有时限等待,同样不占用 cpu 时间

    • 当其它持锁线程调用 notify() 或 notifyAll() 方法,会按照一定规则唤醒等待集合中的有时限等待线程,恢复为可运行状态,并重新去竞争锁

    • 如果等待超时,也会从有时限等待状态恢复为可运行状态,并重新去竞争锁

    • 还有一种情况是调用 sleep(long) 方法也会从可运行状态进入有时限等待状态,但与 Monitor 无关,不会释放锁,不需要主动唤醒,超时时间到自然恢复为可运行状态,在等待时同样不占用 cpu

五种状态

多线程与高并发基础

 五种状态时根据操作系统层面划分的,分为:新建、就绪、运行、终结、阻塞

  • 新建与终结态:与 java 中同名状态类似

  • 运行态:分到 cpu 时间,能真正执行线程内代码的

  • 就绪态:有资格分到 cpu 时间,但还未轮到它的

  • 阻塞态:没资格分到 cpu 时间的

    • 涵盖了 java 状态中提到的阻塞等待有时限等待

    • 多出了阻塞 I/O,指线程在调用阻塞 I/O 时,实际活由 I/O 设备完成,此时线程无事可做,只能干等(不需要分到CPU就可以完成)

注意:Java中的 RUNNABLE状态 涵盖了就绪、运行、阻塞I/O

wait 和 sleep 比较

共同点

  • wait() ,wait(long) 和 sleep(long) 的效果都是让当前线程暂时放弃 CPU 的使用权,进入阻塞状态

不同点

  • 方法归属不同

    • sleep(long) 是 Thread 的静态方法

    • 而 wait(),wait(long) 都是 Object 的成员方法,每个对象都有(每个对象都可以作为锁,获得锁后就可以调用这两个方法)

  • 醒来时机不同

    • 执行 sleep(long) 和 wait(long) 的线程都会在等待相应毫秒后醒来

    • wait(long) 和 wait() 还可以被 notify 唤醒,wait() 如果不唤醒就一直等下去

    • 它们都可以被打断唤醒

  • 锁特性不同(重点)

    • wait 方法的调用必须先获取 wait 对象的锁,而 sleep 则无此限制

    • wait 方法执行后会释放对象锁,允许其它线程获得该对象锁(我放弃 cpu,但你们还可以用)

    • 而 sleep 如果在 synchronized 代码块中执行,并不会释放对象锁(我放弃 cpu,但我不释放锁)

线程池

线程池的作用及设计思路

线程池也是可以看做成一个池子,在该池子中存储很多个线程。

那么线程池有啥用呢?

        系统创建一个线程的成本是比较高的,因为它涉及到与操作系统交互。当程序中需要创建大量生存期很短暂的线程时,频繁的创建和销毁线程对系统的资源消耗有可能大于业务处理对系统资源的消耗,这样就有点"舍本逐末"了。

        针对这一种情况,为了提高性能,我们就可以采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就会启动一个线程来执行该任务。等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态。等待下一次任务的执行。

线程池的设计思路 :

  1. 准备一个任务容器

  2. 一次性启动多个消费者线程

  3. 刚开始任务容器是空的,所以线程都在wait

  4. 直到一个外部线程向这个任务容器中扔了一个"任务",就会有一个消费者线程被唤醒

  5. 这个消费者线程取出"任务",并且执行这个任务,执行完毕后,继续等待下一次任务的到来

线程池的创建

我们可以使用 Executors类 中所提供的静态方法来创建线程池

static ExecutorService newCachedThreadPool() 创建一个默认的线程池 ​

static newFixedThreadPool(int nThreads) 创建一个指定最多线程数量的线程池

默认线程池的创建

static ExecutorService newCachedThreadPool() 创建一个默认的线程池

代码实现:

思路:

  1. 创建一个池子,刚创建时池子中是空的。

  2. 有任务需要执行时,如果池子里没有空闲的线程,则创建线程对象(有则直接拿来用)。ExecutorService 的 submit 方法(池子会自动的帮我们创建对象,任务执行完毕,线程对象归还给池子。)

  3. 所有任务全部执行完毕,关闭线程池。ExecutorService 的 shutdown 方法

public class Test {
    public static void main(String[] args) throws InterruptedException {
        //创建一个默认的线程池对象.池子中默认是空的.默认最多可以容纳int类型的最大值.
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //Executors --- 可以帮助我们创建线程池对象
        //ExecutorService --- 可以帮助我们控制线程池
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + "在执行了");
        });
        //Thread.sleep(2000); 如果加上此句,即不让主线程这么块去执行下面这句,
        // 则上面那句创建的线程已经执行好了,放入池中,执行到下面这句的时候就不会创建新的线程了
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + "在执行了");
        });

        Thread.sleep(100);
        //我们可以通过ExecutorService的子类的getPoolSize()获取池中的线程数
        ThreadPoolExecutor pool = (ThreadPoolExecutor) executorService;
        System.out.println(pool.getPoolSize());
        
        // 当线程池不再使用时,关闭线程池
        executorService.shutdown();
    }
}

创建指定上限的线程池

static ExecutorService newFixedThreadPool(int nThreads) : 创建一个指定最多线程数量的线程池

相比于上面的代码只是换了个方法并且指定了线程中线程数量的最大值而已。

注意:这里指定的是最大值而不是初始值,初始值和上面一样是空的

ThreadPoolExecutor类 及 线程池七大参数 

查看源码就会发现 newCachedThreadPool() 及 newFixedThreadPool(int nThreads) 里面都是通过 new ThreadPoolExecutor(一堆参数); 来实现的

而这需要传递的一堆参数即为线程池的七大参数,分别是:

多线程与高并发基础

  1. corePoolSize 核心线程数目

    • 池中会保留的最多线程数

  2. maximumPoolSize 最大线程数目

    • 核心线程+救急线程的最大数目

      (核心线程:当你这个线程执行完任务后任然需要保留在线程池中的线程;救急线程:当执行完任务后不被保留在线程池中的线程)

  3. keepAliveTime 生存时间 

    • 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放

  4. unit 时间单位

    • 救急线程的生存时间单位,如秒、毫秒等

  5. workQueue 任务队列

    • 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务

    • 一般会设置其能容的量,不然可能导致内存紧张

  6. threadFactory 线程工厂

    • 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等。也可以直接传入 Executors.defaultThreadFactory()

  7. handler 拒绝策略:当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略。有四种拒绝策略,分别为:

    1. 丢弃任务并抛出异常  ThreadPoolExecutor.AbortPolicy   这是默认的策略,如果用没有该参数的构造方法,那么就默认为此种策略

    2. 由调用者执行任务 ThreadPoolExecutor.CallerRunsPolicy(线程池已经忙不过来了,调用者你自己去干吧你(就是相当于直接调用任务的run() 方法直接执行))

    3. 直接丢弃任务       ThreadPoolExecutor.DiscardPolicy

    4. 丢弃最早排队任务    ThreadPoolExecutor.DiscardOldestPolicy

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

// corePoolSize:       -------------------不能小于0
// maximumPoolSize      ----------------不能小于 corePoolSize
// keepAliveTime:   --------------------不能小于0
// unit:           时间单位
// workQueue:      ------------------------不能为null
// threadFactory:  创建线程工厂  -----不能为null
// handler:        任务的拒绝策略  -----不能为null 

 具体使用:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger c = new AtomicInteger(1);
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5,
                10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                r -> new Thread(r, "myThread" + c.getAndIncrement()),  //--------定制线程名
                new ThreadPoolExecutor.AbortPolicy());

    }
}

class Test1 {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5,
                10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                Executors.defaultThreadFactory(),       //-------------------使用默认的ThreadFactory
                new ThreadPoolExecutor.AbortPolicy());

    }
}

volatile 关键字解决可见性问题

线程安全主要需要考虑的有三个问题:原子性、可见性、有序性

可见性

  • 起因:由于编译器优化、或缓存优化、或 CPU 指令重排序优化导致的对共享变量所做的修改另外的线程看不到

  • 解决:用 volatile 修饰共享变量,能够防止编译器等优化发生,让一个线程对共享变量的修改对另一个线程可见。即强制线程每次在使用的时候,都会看一下共享区域最新的值

如下案例:

public class Test {

    public static void main(String[] args) throws InterruptedException {
        Boy boy = new Boy();
        boy.setName("小明");
        Girl girl = new Girl();
        girl.setName("小红");
        girl.start();
        boy.start();
    }
}

class Money {
    public static int money = 100000;
}

class Boy extends Thread {
    @Override
    public void run() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Money.money = 90000;  //  花了1万
        System.out.println("小明花了一万块钱");
    }
}

class Girl extends Thread {
    @Override
    public void run() {
        while (Money.money == 100000) {
            //盯着这钱看还是不是十万,如果还是那就啥也不干
        }
        System.out.println("钱已经不是十万了");
    }
}

上面的代码运行后发现,代码在小明花了一万块钱后卡住不动了,说明小红线程在循环中没有出来,说明小明线程对共享数据是不可见的。

但如果先启动 boy 再启动 girl ,并且在启动 girl 的时候先 sleep 10 毫秒让 boy 先执行完就不会这样了。为啥呢?就是因为编译器的优化,如果 girl 先启动,那么运行一段时间后,girl 循环的那段代码会变成会被 JIT 即时编译器当成热点代码,money就直接放到cpu缓存了,然后就不会再重新去读共享数据了。所以这里是编译器的优化导致的可见性问题,要如何解决呢?在money的数据类型前加上 volatile 关键字即可。

原子性问题

  • 起因:多线程下,不同线程的指令发生了交错导致的共享变量的读写混乱

  • 解决:用悲观锁或乐观锁解决,volatile 并不能解决原子性

 举个例子:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        MyAtomThread atom = new MyAtomThread();
        for (int i = 0; i < 100; i++) {
            new Thread(atom).start();
        }
    }
}


class MyAtomThread implements Runnable {
    private volatile int count = 0; //送冰淇淋的数量
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            count++;
            System.out.println("已经送了" + count + "个冰淇淋");
        }
    }
}

执行代码,发现本来最后的一次输出应该为 10000,结果有时候是,有时候不是,这里就是因为原子性问题的产生。因为这里的 count++ 并不是一个原子性操作,在这里,可以把这里的整个过程看成三步:

  1. 从共享数据中读取数据到本线程栈中.
  2. 修改本线程栈中变量副本的值
  3. 会把本线程栈中变量副本的值赋值给共享数据.

 当一个线程执行完第1步读取到数为100时,这时候,另一个线程执行了两次循环中的操作,把数据改成了102,然后该线程继续执行2、3步,又把数据改成了101,那岂不是就因此产生错误了。

那么该如何解决?

方法一:悲观锁

class MyAtomThread implements Runnable {
    private volatile int count = 0; //送冰淇淋的数量
    private final Object obj = new Object();

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            //1,从共享数据中读取数据到本线程栈中.
            //2,修改本线程栈中变量副本的值
            //3,会把本线程栈中变量副本的值赋值给共享数据.
            synchronized (obj) {
                count++;
                System.out.println("已经送了" + count + "个冰淇淋");
            }
        }
    }
}

方法二:乐观锁

这里需要用到 AtomicInteger 类,这里先只写解决方法,下节再详谈 AtomicInteger 和乐观锁

public class Test {
    public static void main(String[] args) throws InterruptedException {
        MyAtomThread atom = new MyAtomThread();
        for (int i = 0; i < 100; i++) {
            new Thread(atom).start();
        }
    }
}


class MyAtomThread implements Runnable {
    private AtomicInteger ac = new AtomicInteger(0);

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            int count = ac.incrementAndGet();
            System.out.println("已经送了" + count + "个冰淇淋");
        }
    }
}

AtomicInteger 及 乐观锁(CAS)

java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。

因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。

本次我们只讲解使用原子的方式更新基本类型,使用原子的方式更新基本类型Atomic包提供了以下3个类:

AtomicBoolean: 原子更新布尔类型

AtomicInteger: 原子更新整型

AtomicLong: 原子更新长整型

以上3个类提供的方法几乎一模一样,所以本节仅以AtomicInteger为例进行讲解,AtomicInteger的常用方法如下:

public AtomicInteger():	   			    初始化一个默认值为0的原子型Integer
public AtomicInteger(int initialValue):    初始化一个指定值的原子型Integer

int get():   			 				 获取值
int getAndIncrement():      			 以原子方式将当前值加1,注意,这里返回的是自增前的值。
int incrementAndGet():     				 以原子方式将当前值加1,注意,这里返回的是自增后的值。
int addAndGet(int data):				 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
int getAndSet(int value):   			 以原子方式设置为newValue的值,并返回旧值。

减则是 decrement

CAS:

有3个操作数(内存值V,旧的预期值A,要修改的值B)

在即将修改的时候读到原本的内存值,然后将 旧的预期值A 设为该值,

然后将修改的值设为B,在要提交更新的时候再次读取内存值,然后进行判断

  • 当旧的预期值A == 内存值 此时修改成功,将V改为B
  • 当旧的预期值A != 内存值 此时修改失败,不会做出修改

如果没有修改的话,会重新获取现在的最新值然后将旧的预期值A设为此次获取到的值,然后继续按照整个步骤操作,这里的操作就叫自选

简单概括此过程为:开始获取到值的时候保存一份,然后要修改的时候再读取一次内存中的值看跟刚刚保存的是否一致,如果不是,说明有人已经改过,需要重新获取再来一遍,如果还出现这样的问题,继续自旋

悲观锁和乐观锁的对比

悲观锁与乐观锁的相同点:在多线程情况下,都可以保证共享数据的安全性。那他们具体有什么不同的呢?以下就是悲观锁和乐视锁各自的特点:

  • 悲观锁的代表是 synchronized 和 Lock 锁

    • 其核心思想是【线程只有占有了锁,才能去操作共享变量,每次只有一个线程占锁成功,获取锁失败的线程,都得停下来等待】

    • 线程从运行到阻塞、再从阻塞到唤醒,涉及线程上下文切换,如果频繁发生,影响性能

    • 实际上,线程在获取 synchronized 和 Lock 锁时,如果锁已被占用,都会做几次重试操作,减少阻塞的机会

  • 乐观锁的代表是 AtomicInteger,使用 cas 来保证原子性

    • 其核心思想是:无需加锁,每次只有一个线程能成功修改共享变量,其它失败的线程不需要停止,不断重试直至成功

    • 由于线程一直运行,不需要阻塞,因此不涉及线程上下文切换

    • 它需要多核 cpu 支持,且线程数不应超过 cpu 核数(如果没有多核CPU支持,虽然不会有阻塞状态,但还是会涉及线程上下文切换)

为啥叫 “悲观” “乐观” 呢?

  • 悲观锁总是从最坏的角度出发,认为每次获取数据的时候,别人都有可能修改。所以在每次操作共享数据之前,都会上锁。

  • 乐观锁是从乐观的角度出发,假设每次获取数据别人都不会修改,所以不会上锁。只不过在修改共享数据的时候,会检查一下,别人有没有修改过这个数据。

并发工具类 —— Hashtable

Hashtable出现的原因 : 在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。原因如下:

  1. Hashtable采取悲观锁synchronized的形式保证数据的安全性

  2. 只要有线程访问,会将整张表全部锁起来,同一时刻,只能有一个线程操作它

也正因为它效率低的原因,所以它很少被使用到

并发工具类 —— ConcurrentHashMap

ConcurrentHashMap出现的原因 : 在集合类中 HashMap 是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。

基于以上两个原因我们可以使用JDK1.5以后所提供的 ConcurrentHashMap。

ConcurrentHashMap 1.7原理

  • 数据结构:Segment(大数组) + HashEntry(小数组) + 链表,每个 Segment 对应一把锁,如果多个线程访问不同的 Segment,则不会冲突。

  • 并发度:Segment 数组大小即并发度,决定了同一时刻最多能有多少个线程并发访问。Segment 数组不能扩容,意味着并发度在 ConcurrentHashMap 创建时就固定了

  • 扩容:每个小数组的扩容相对独立,小数组在超过扩容因子时会触发扩容,每次扩容翻倍

  • Segment[0] 原型:首次创建其它小数组时,会以此原型为依据,数组长度,扩容因子都会以原型为准

具体的一些细节在这里就不讲了,毕竟它已经过时了,如果有兴趣网上找ConcurrentHashMap1.7的帖子学习即可。

ConcurrentHashMap 1.8原理

  • 数据结构:Node 数组 + 链表或红黑树,数组的每个头节点作为锁,如果多个线程访问的头节点不同,则不会冲突。

  • 并发度:Node 数组有多大,并发度就有多大,与 1.7 不同,Node 数组可以扩容

  • 扩容条件:Node 数组 3/4 时就会扩容(1.7是要超过factory ,这个满3/4就行)

  • 扩容单位:以链表为单位从后向前迁移链表,迁移完成的将旧数组头节点替换为 ForwardingNode

  • 扩容时并发 get

    • 根据是否为 ForwardingNode 来决定是在新数组查找还是在旧数组查找,不会阻塞

    • 如果链表长度超过 1,则需要对节点进行复制(创建新节点),怕的是节点迁移后 next 指针改变

    • 如果链表最后几个元素扩容后索引不变,则节点无需复制

  • 扩容时并发 put

    • 如果 put 的线程与扩容线程操作的链表是同一个,put 线程会阻塞

    • 如果 put 的线程操作的链表还未迁移完成,即链表头节点不是 ForwardingNode,则可以并发执行

    • 如果 put 的线程操作的链表已经迁移完成,即链表头结点是 ForwardingNode,则可以协助扩容

    • 在迁移时是节点是会重新计算哈希值的

  • 与 1.7 相比是懒惰初始化

  • capacity 代表预估的元素个数,capacity / factory 来计算出初始数组大小,需要贴近 。由于它是满就扩容,所以如果 capacity = 12,factory = 3 / 4 那么 初始数组大小会是32

  • loadFactor 只在计算初始数组大小时被使用,之后扩容固定为 3/4

  • 超过树化阈值时的扩容问题,如果容量已经是 64,直接树化,否则在原来容量基础上做 3 轮扩容

并发工具类——CountDownLatch

使用场景: 让某一条线程等待其他线程执行完毕之后再执行

相关方法:

方法 解释
public CountDownLatch(int count) 参数传递线程数,表示等待线程数量
public void await() 让线程等待,当计数器变成0的时候,会自动唤醒这里等待的线程
public void countDown() 当前线程执行完毕,让计数器 -1

以下这个案例能很好的帮助我们理解,里面的妈妈线程在等三个孩子线程执行完吃饺子的代码后才执行自己收拾碗筷的代码

public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        MotherThread motherThread = new MotherThread(countDownLatch);
        ChileThread1 chileThread1 = new ChileThread1(countDownLatch);
        ChileThread2 chileThread2 = new ChileThread2(countDownLatch);
        ChileThread3 chileThread3 = new ChileThread3(countDownLatch);
        motherThread.start();
        chileThread1.start();
        chileThread2.start();
        chileThread3.start();
    }
}

class MotherThread extends Thread {
    private CountDownLatch countDownLatch;
    public MotherThread(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        //1.等待
        try {
            //当计数器变成0的时候,会自动唤醒这里等待的线程。
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //2.收拾碗筷
        System.out.println("妈妈在收拾碗筷");
    }
}

class ChileThread1 extends Thread {

    private CountDownLatch countDownLatch;
    public ChileThread1(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        //1.吃饺子
        for (int i = 1; i <= 10; i++) {
            System.out.println(getName() + "在吃第" + i + "个饺子");
        }
        //2.吃完说一声
        //每一次countDown方法的时候,就让计数器-1
        countDownLatch.countDown();
    }
}

class ChileThread2 extends Thread {
    private CountDownLatch countDownLatch;
    public ChileThread2(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        //1.吃饺子
        for (int i = 1; i <= 15; i++) {
            System.out.println(getName() + "在吃第" + i + "个饺子");
        }
        //2.吃完说一声
        //每一次countDown方法的时候,就让计数器-1
        countDownLatch.countDown();
    }
}

class ChileThread3 extends Thread {
    private CountDownLatch countDownLatch;
    public ChileThread3(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        //1.吃饺子
        for (int i = 1; i <= 20; i++) {
            System.out.println(getName() + "在吃第" + i + "个饺子");
        }
        //2.吃完说一声
        //每一次countDown方法的时候,就让计数器-1
        countDownLatch.countDown();
    }
}

并发工具类——Semaphore

使用场景 : 可以控制访问特定资源的线程数量

方法 解释
public Semaphore(int permits) 创建管理员对象,设置最多的通行证数(即最多允许多少线程执行)
public void  acquire() 获取通行证
public void release() 当前线程执行完毕,释放通行证

例子:某通道一次性只允许两辆车通过

public class Test {
    public static void main(String[] args) throws InterruptedException {
        MyRunnable myRunnable = new MyRunnable();
        for (int i = 0; i < 10; i++) {
            new Thread(myRunnable).start();
        }
    }
}

class MyRunnable implements Runnable {
    //1.创建通行证管理员对象
    private Semaphore semaphore = new Semaphore(2);

    @Override
    public void run() {
        //2.获得通行证
        try {
            semaphore.acquire();
            //3.开始行驶
            System.out.println("获得了通行证开始行驶");
            Thread.sleep(2000);
            System.out.println("归还通行证");
            //4.归还通行证
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

本文就暂时先到这里,多线程与高并发的内容是非常多也非常有用的,接下来还得继续深入学习,大家一起加油!!

多线程与高并发基础

 

版权声明:程序员胖胖胖虎阿 发表于 2022年10月2日 下午8:00。
转载请注明:多线程与高并发基础 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...