JUC并发编程

2年前 (2022) 程序员胖胖胖虎阿
294 0 0

文章目录

  • JUC
    • 1、什么是JUC
    • 2、线程和进程
    • 3、Lock锁
      • 3.1、传统Synchronized
      • 3.2、Lock 接口
      • 3.3、Synchronized 和 Lock 区别
    • 4、生产者和消费者问题
      • 生产者和消费者问题 Synchronized 版
      • JUC版的生产者和消费者问题
      • Condition 精准的通知和唤醒线程
    • 5、8锁现象
    • 6、集合类不安全
    • 7、Callable
    • 8、常用的辅助类(必会)
      • 8.1、CountDownLatch
      • 8.2、CyclicBarrier
      • 8.3、Semaphore
    • 9、读写锁
    • 10、阻塞队列
      • **BlockingQueue**
      • SynchronousQueue 同步队列
    • 11、线程池(重点)
      • 11.1、池化技术
      • 11.2、线程池:三大方法
      • 11.3、7大参数
      • 11.4、4种拒绝策略
      • 11.5、小结和拓展
    • 12、四大函数式接口(必需掌握)
      • 12.1、函数式接口:
      • 12.2、断定型接口:
      • 12.3、Consumer 消费型接口
      • 12.4、Supplier 供给型接口
    • 13、Stream流式计算
      • 什么是Stream流式计算
    • 14、ForkJoin
    • 15、异步回调
    • 16、JMM
      • 什么是JMM
    • 17、Volatile
      • 17.1、保证可见性
      • 17.2、不保证原子性
      • 17.3、指令重排
    • 18、彻底玩转单例模式
      • 饿汉式
      • 懒汉式单例
      • 静态内部类
      • 枚举
    • 19、深入理解CAS
      • 19.1、什么是 CAS
      • 19.2、Unsafe 类
      • 19.3、CAS : ABA 问题(狸猫换太子)
    • 20、原子引用
    • 21、各种锁的理解
      • 21.1、公平锁、非公平锁
      • 21.2、可重入锁
      • 21.3、自旋锁
      • 21.4、死锁

JUC

1、什么是JUC

java.util.concurrent

JUC并发编程

Runnable 没有返回值、效率相比入 Callable 相对较低!

JUC并发编程

JUC并发编程

2、线程和进程

进程:一个程序,QQ.exe,music.exe 程序的集合,

一个进程往往可以包含多个线程,至少包含一个!

Java默认有几个线程? 2个,mian、GC

开启线程,对于Java而言:Thread、Runnable、Callable

Java 真的可以开启线程吗? 开不了

    public synchronized void start() {
        if (this.threadStatus != 0) {
            throw new IllegalThreadStateException();
        } else {
            this.group.add(this);
            boolean var1 = false;

            try {
                this.start0();
                var1 = true;
            } finally {
                try {
                    if (!var1) {
                        this.group.threadStartFailed(this);
                    }
                } catch (Throwable var8) {
                }

            }

        }
    }
	//本地方法, 底层的C++, Java无法直接操作硬件
    private native void start0();

并发、并行

并发编程:并发、并行

  • CPU一核,模拟出来多条线程,天下武功,为快不破,快速交替

并行(多个人一起行走)

  • CPU多核,多个线程可以同时执行;线程池
public class Test1 {
    public static void main(String[] args) {
        //获取cpu的核数
        // CPU 密集型,IO密集型
        System.out.println(Runtime.getRuntime().availableProcessors());
    }
}

并发编程的本质:充分利用CPU的资源

线程有几个状态

public static enum State {
    //新生
    NEW,
    //运行
    RUNNABLE,
    //阻塞
    BLOCKED,
    //等等,死死地等待
    WAITING,
    //超时等待
    TIMED_WAITING,
    //终止
    TERMINATED;

    private State() {
    }
}

wait/sleep 区别

1、来自不同的类

  • wait = > Object

  • sleep => Thread

2、关于锁的释放

wait会释放锁,sleep睡觉了,抱着锁睡觉,不会释放!

3、使用的范围是不同的

  • wait 必须在同步代码块中

  • sleep:可以在任何敌方睡

4、是否需要捕获异常

  • wait 不需要捕获异常
  • sleep 必须要捕获异常

3、Lock锁

3.1、传统Synchronized

package com.su.demo01;

// 基本的买票例子

/**
 * 真正的多线程开发,公司中的开发
 * 线程就是一个单独的资源类,没有任何附属的操作!
 * 1、 属性、方法
 */
public class SaleTicketDemo01 {
    // 并发:多线程操作同一个资源类
    public static void main(String[] args) {
        Ticket ticket = new Ticket();

        new Thread(()->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        }, "A").start();
        new Thread(()->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        }, "B").start();
        new Thread(()->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        }, "C").start();
    }
}

// 资源类  OOP
class Ticket{
    // 属性、方法
    private int number = 50;

    // 卖票的方式
    public synchronized void sale(){
        if (number > 0){
            System.out.println(Thread.currentThread().getName()+"卖出了"+ (number--) + "票,剩余:"+number);
        }
    }
}

3.2、Lock 接口

JUC并发编程

JUC并发编程

JUC并发编程

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 真正的多线程开发,公司中的开发
 * 线程就是一个单独的资源类,没有任何附属的操作!
 * 1、 属性、方法
 */
public class SaleTicketDemo02 {
    // 并发:多线程操作同一个资源类
    public static void main(String[] args) {
        Ticket ticket = new Ticket();

        new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"A").start();
        new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"B").start();
        new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"C").start();
    }
}

// Lock三部曲
// 1、 new ReentrantLock();
// 2、 lock.lock(); // 加锁
// 3、 finally=> lock.unlock(); // 解锁
class Ticket2 {
    // 属性、方法
    private int number = 30;
    Lock lock = new ReentrantLock();
    public void sale(){
        lock.lock(); // 加锁
        try {
			// 业务代码
            if (number>0){
                System.out.println(Thread.currentThread().getName()+"卖出了"+
                        (number--)+"票,剩余:"+number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 解锁
        }
    }
}

3.3、Synchronized 和 Lock 区别

1、Synchronized 内置的Java关键字,Lock是一个java类

2、Synchronized 无法判断获取锁的状态,Lock 可以判断是获取到了锁

3、Synchronized 会自动释放锁,Lick必须要手动释放锁!如果不释放就会 死锁

4、Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去

5、Synchronized 可重入锁,不可以中段的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置)

6、Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!

锁是什么,如何判断锁的是谁?

4、生产者和消费者问题

生产者和消费者问题 Synchronized 版

import java.util.Date;
/**
 * 线程之间的通信问题:生产者和消费者问题
 *  线程交替执行 A B 操作同一个变量
 */
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}


// 判断等待,业务,通知
class Data{
    private int number = 0;

    //+1
    public synchronized void increment() throws InterruptedException {
        if (number !=0){
            // 等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        // 通知其他线程,我+1完毕了
        this.notifyAll();
    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        if (number == 0){
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notifyAll();
    }
}

问题存在,A B C D 4 个线程! 虚假唤醒

public static void main(String[] args) {
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

JUC并发编程

JUC并发编程

解决方法:if 改为 while 判断

public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}


// 判断等待,业务,通知
class Data{
    private int number = 0;

    //+1
    public synchronized void increment() throws InterruptedException {
        while (number !=0){
            // 等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        // 通知其他线程,我+1完毕了
        this.notifyAll();
    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        while (number == 0){
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        //通知其他线程,我-1完毕了
        this.notifyAll();
    }
}

JUC版的生产者和消费者问题

JUC并发编程

通过Lock 找到 Condition

JUC并发编程

代码实现:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class B {
    public static void main(String[] args) {
        Data2 data2 = new Data2();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}


class Data2{
    private int number = 0;

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //+1
    public  void increment() throws InterruptedException {
        lock.lock();
        try {
            while (number !=0){
                // 等待
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            // 通知其他线程,我+1完毕了
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    //-1
    public  void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (number == 0){
                //等待
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            //通知其他线程,我-1完毕了
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

问题:JUC并发编程

任何一个新的技术,绝对不是仅仅只是覆盖了原来的技术,优势和补充!

Condition 精准的通知和唤醒线程

代码实现:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*
  A 执行完调用B ,B执行完调用C, C执行完调用A
 */
public class C {
    public static void main(String[] args) {
        Data3 data = new Data3();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printA();
            }
        }).start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printB();
            }
        }).start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printC();
            }
        }).start();

    }
}

class Data3{
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private int number =1;

    public void printA(){
        lock.lock();
        try {
            while (number!=1){
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>AAAAAAAAAAA");
            number = 2;
            condition2.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public void printB(){
        lock.lock();
        try {
            while (number!=2){
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>BBBBBBBBBBB");
            number = 3;
            condition2.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public void printC(){
        lock.lock();
        try {
            while (number!=3){
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>CCCCCCCCCCCCC");
            number = 1;
            condition3.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

5、8锁现象

如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁!

深刻理解我们的锁

import java.util.concurrent.TimeUnit;

/*
    8锁, 就是关于锁的8个问题
    1、 标准情况下,两个线程先打印 发短信还是打电话?  ---> 1发短信 2打电话
    2、sendSms延迟4s,两线线程先打印?
 */
public class Test1 {
    public static void main(String[] args) {
        Phone phone = new Phone();

        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone.call();
        },"B").start();
    }
}

class Phone{

    // Synchronized  锁的对象是方法的调用者
    // 两个方法用的是用一个锁,谁先拿到谁执行

    public synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public synchronized void call(){
        System.out.println("打电话");
    }
}
import java.util.concurrent.TimeUnit;

/*
    8锁, 就是关于锁的8个问题
    1、 标准情况下,两个线程先打印 发短信还是打电话?  ---> 1发短信 2打电话
    2、 sendSms延迟4s,两线线程先打印?  1发短信
    3、 增加了一个普通方法后!先执行发短息还是Hello? hello
    4、 两个对象,两个同步方法?  先执行?  打电话
 */
public class Test2 {
    public static void main(String[] args) {
        Phone phone = new Phone();
        Phone2 phone2 = new Phone2();

        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();
    }
}

class Phone2{

    // Synchronized  锁的对象是方法的调用者
    // 两个方法用的是用一个锁,谁先拿到谁执行

    public synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public synchronized void call(){
        System.out.println("打电话");
    }

    // 这里没有锁!不受锁的影响
    public void hello(){
        System.out.println("hello");
    }
}
import java.util.concurrent.TimeUnit;

/*
    8锁, 就是关于锁的8个问题
    1、 标准情况下,两个线程先打印 发短信还是打电话?  ---> 1发短信 2打电话
    2、 sendSms延迟4s,两线线程先打印?  1发短信
    3、 增加了一个普通方法后!先执行发短息还是Hello? hello
    4、 两个对象,两个同步方法?  先执行?  打电话
    5、 增加两个静态的同步方法,只有一个对象 先?  发短信
    6、 两个对象,两个静态的同步方法,先?  打电话
 */
public class Test3 {
    public static void main(String[] args) {
        // 两个对象的Class类模板只有一个,static,锁的就是Class
        Phone3 phone = new Phone3();
        Phone3 phone3 = new Phone3();

        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone3.call();
        },"B").start();
    }
}

class Phone3{

    // Synchronized  锁的对象是方法的调用者
    // 两个方法用的是用一个锁,谁先拿到谁执行
    // static 静态方法
    // 类一加载就有了! 锁的是Class
    public static synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public static synchronized void call(){
        System.out.println("打电话");
    }

    // 这里没有锁!不受锁的影响
    public void hello(){
        System.out.println("hello");
    }
}
import java.util.concurrent.TimeUnit;

/*
    8锁, 就是关于锁的8个问题
    1、 标准情况下,两个线程先打印 发短信还是打电话?  ---> 1发短信 2打电话
    2、 sendSms延迟4s,两线线程先打印?  1发短信
    3、 增加了一个普通方法后!先执行发短息还是Hello? hello
    4、 两个对象,两个同步方法?  先执行?  打电话
    5、 增加两个静态的同步方法,只有一个对象 先?  发短信
    6、 两个对象,两个静态的同步方法,先?  打电话
    7、 1个静态的同步方法, 1个普通的同步方法,一个对象,先?  打电话
    8、 1个静态的同步方法, 1个普通的同步方法,两个对象,先?   打电话
 */
public class Test4 {
    public static void main(String[] args) {
        Phone4 phone1 = new Phone4();
        Phone4 phone2 = new Phone4();

        new Thread(()->{
            phone1.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();
    }
}

class Phone4{

    // Synchronized  锁的对象是方法的调用者
    // 两个方法用的是用一个锁,谁先拿到谁执行
    // static 静态方法
    // 类一加载就有了! 锁的是Class类模板
    public static synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    //普通的同步方法,锁的是调用者
    public synchronized void call(){
        System.out.println("打电话");
    }

    // 这里没有锁!不受锁的影响
    public void hello(){
        System.out.println("hello");
    }
}

小结:

new this 具体的一个手机

static Class 唯一的模板

6、集合类不安全

public class ListTest {   
    public static void main(String[] args) {       
        List<String> list = new ArrayList<>();        
        for (int i = 0; i < 10; i++) {            
            new Thread(()->{                
                list.add(UUID.randomUUID().toString().substring(0, 5));               
                System.out.println(list);            
            },String.valueOf(i)).start();       
        }    
    }
}

JUC并发编程

java.util.ConcurrentModificationException 并发修改异常

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListTest {
    public static void main(String[] args) {
        // 并发下 ArrayList 不安全,Synchronized

        /**
         * 解决方案:
         * 1、List<String> list = new Vector<>();
         * 2、List<String> list = Collections.synchronizedList(new ArrayList<>());
         * 3、List<String> list = new CopyOnWriteArrayList<>();
         */

        // CopyOnWrite 写入时复制   COW  计算机程序设计领域的一种优化策略
        // 多个线程调用的时候,list,读取的时候,固定的,写入(覆盖)
        // 在写入的时候避免覆盖,造成数据问题
        // 读写分离
        // CopyOnWriteArrayList  比 Vector  nb在哪里?
        List<String> list = new CopyOnWriteArrayList<>();

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

Set不安全

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;


/**
 * 同理可证   ConcurrentModificationException
 * 1、 Set<String> set = Collections.synchronizedSet(new HashSet<>());
 * 2、 Set<String> set = new CopyOnWriteArraySet<>();
 */
public class SetTest {

    public static void main(String[] args) {
//        Set<String> set = new HashSet<>();
//        Set<String> set = Collections.synchronizedSet(new HashSet<>());
        Set<String> set = new CopyOnWriteArraySet<>();

        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(set);
            }).start();
        }
    }
}

HashSet底层是什么?

public HashSet(){    
    map = new HashMap<>();
}
public boolean add(E var1) {        
    return this.map.put(var1, PRESENT) == null;
}
private static final Object PRESENT = new Object();  // 不变的值!

Map 不安全

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

// ConcurrentModificationException
public class MapTest {
    public static void main(String[] args) {
        // map 是这样用的吗?  不是,工作中不用 HashMap
        // 默认等价于什么?   new  HashMap<>(16, 0.75)

//        Map<String, String> map = new HashMap<>();

        Map<String, String> map = new ConcurrentHashMap<>();

        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println(map);
            }).start();
        }
    }
}

7、Callable

JUC并发编程

1、可以有返回值

2、可以抛出异常

3、方法不同,run()/ call()

JUC并发编程

JUC并发编程

JUC并发编程

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableTest {
	public static void main(String[] args) throws ExecutionException, InterruptedException {
		// new Thread(new Runnable()).start();
		// new Thread(new FutureTask<V>()).start();
		// new Thread(new FutureTask<V>( Callable )).start();

		new Thread().start(); // 怎么启动Callable

		MyThread thread = new MyThread();
		FutureTask<String> futureTask = new FutureTask<>(thread); // 适配类
		new Thread(futureTask, "A").start();
		new Thread(futureTask, "B").start(); // 结果会被缓存,效率高
		// 获取callable 的返回结果
		String o = (String) futureTask.get(); //这个get 方法可能会产生阻塞!把他放到最后
		// 或者使用异步通信来处理!

		System.out.println(o);

	}
}

/**
 * 传统的方式
 */
//class MyThread implements Runnable{
//	@Override
//	public void run() {
//	}
//}

class MyThread implements Callable<String> {

	@Override
	public String call() throws Exception {
		System.out.println("call()");
		return "1024";
	}
}

JUC并发编程

细节:

1、有缓存

2、结果可能需要等待,会阻塞!

8、常用的辅助类(必会)

8.1、CountDownLatch

JUC并发编程

// 计数器
public class CountDownLatchDemo {
	public static void main(String[] args) throws InterruptedException {
		// 总数是6,必须要执行任务的时候,再使用!
		CountDownLatch countDownLatch = new CountDownLatch(6);

		for (int i = 0; i < 6; i++) {
			new Thread(() -> {
				System.out.println(Thread.currentThread().getName() + " GO out");
				countDownLatch.countDown();  // 数量-1
			}, String.valueOf(i)).start();
		}

		countDownLatch.await(); //等待计时器归零,然后再向下执行

		System.out.println("Close door");
	}
}

JUC并发编程

原理:
countDownLatch.countDown(); // 数量-1

countDownLatch.await(); // 等待计数器归零,然后再向下执行

每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!

8.2、CyclicBarrier

JUC并发编程

加法计数器

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
	public static void main(String[] args) {
		/*
		 * 集齐7颗龙珠召唤神龙
		 */
		// 召唤神龙的线程
		CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
			System.out.println("召唤神龙成功!");
		});

		for (int i = 0; i < 7; i++) {
			// lambda 能操作到 i 吗
			final int temp = i;
			new Thread(() -> {
				System.out.println(Thread.currentThread().getName() + "收集" + temp + "个龙珠");

				try {
					cyclicBarrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
			}).start();
		}
	}
}

JUC并发编程

8.3、Semaphore

Semaphore:信号量

JUC并发编程

抢车位!

6车—3个停车位置

public class SemaphoreDemo {
	public static void main(String[] args) {
		// 线程数量: 停车位! 限流!
		Semaphore semaphore = new Semaphore(3);

		for (int i = 0; i < 6; i++) {
			new Thread(() -> {
				// acquire() 得到
				try {
					semaphore.acquire();
					System.out.println(Thread.currentThread().getName() + " 抢到车位");
					TimeUnit.SECONDS.sleep(2);
					System.out.println(Thread.currentThread().getName() + " 离开车位");
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					semaphore.release();
				}
				// release() 释放

			}, String.valueOf(i)).start();
		}
	}
}

JUC并发编程

原理:

semaphore.acquire() 获得,假设如果已经满了,等待,等待被释放为止!

semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!

作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数!

9、读写锁

ReadWriteLock

JUC并发编程

/** * ReadWriteLock */
public class ReadWriteLockDemo {   
    public static void main(String[] args) {      
        MyCache myCache = new MyCache();      
        for (int i = 0; i < 5; i++) {         
            final int temp = i;         
            new Thread(() -> {            
                myCache.put(temp + "", temp + "");         
            }, String.valueOf(i)).start();      
        }      
        for (int i = 0; i < 5; i++) {         
            final int temp = i;         
            new Thread(() -> {            
                myCache.get(temp + "");         
            }, String.valueOf(i)).start();      
        }   
    }
}

/** * 自定义缓存 */
class MyCache {   
    private volatile Map<String, Object> map = new HashMap<>();   
    // 存,写   
    public void put(String key, Object value) {   
        System.out.println(Thread.currentThread().getName() + "写入" + key);      
        map.put(key, value);      
        System.out.println(Thread.currentThread().getName() + "写入ok");   
    }   
    // 取,读   
    public void get(String key) {     
        System.out.println(Thread.currentThread().getName() + "读取" + key);      
        Object o = map.get(key);      
        System.out.println(Thread.currentThread().getName() + "读取ok");   
    }
}

JUC并发编程

发现写入的时候被插队了

加上读写锁

/**
 * 独占锁(写锁) 一次只能被一个线程占有
 * 共享锁(读锁) 多个线程可以同时占有
 * ReadWriteLock
 * 读-读 可以共存!
 * 读-写 不能共存!
 * 写-写 不能共存!
 */
public class ReadWriteLockDemo {
	public static void main(String[] args) {
		MyCacheLock myCache = new MyCacheLock();

		for (int i = 0; i < 5; i++) {
			final int temp = i;
			new Thread(() -> {
				myCache.put(temp + "", temp + "");
			}, String.valueOf(i)).start();
		}

		for (int i = 0; i < 5; i++) {
			final int temp = i;
			new Thread(() -> {
				myCache.get(temp + "");
			}, String.valueOf(i)).start();
		}
	}
}


/**
 * 加锁的
 */
class MyCacheLock {

	private volatile Map<String, Object> map = new HashMap<>();
	// 读写锁:更加细粒度的控制
	private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	// 存,写入的时候,只希望同时只有一个线程写
	public void put(String key, Object value) {
		readWriteLock.writeLock().lock();
		try {
			System.out.println(Thread.currentThread().getName() + "写入" + key);
			map.put(key, value);
			System.out.println(Thread.currentThread().getName() + "写入ok");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			readWriteLock.writeLock().unlock();
		}

	}

	// 取,读, 所有人都可以读
	public void get(String key) {
		readWriteLock.readLock().lock();
		try {
			System.out.println(Thread.currentThread().getName() + "读取" + key);
			Object o = map.get(key);
			System.out.println(Thread.currentThread().getName() + "读取ok");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			readWriteLock.readLock().unlock();
		}

	}
}

/**
 * 自定义缓存
 */
class MyCache {

	private volatile Map<String, Object> map = new HashMap<>();

	// 存,写
	public void put(String key, Object value) {
		System.out.println(Thread.currentThread().getName() + "写入" + key);
		map.put(key, value);
		System.out.println(Thread.currentThread().getName() + "写入ok");
	}

	// 取,读
	public void get(String key) {
		System.out.println(Thread.currentThread().getName() + "读取" + key);
		Object o = map.get(key);
		System.out.println(Thread.currentThread().getName() + "读取ok");
	}
}

JUC并发编程

10、阻塞队列

JUC并发编程

阻塞队列:

JUC并发编程

JUC并发编程

BlockingQueue

BlockingQueue 不是新的东西

JUC并发编程

什么情况下我们会使用 阻塞队列:多线程并发处理,线程池!

学会使用队列

添加、移除

四组API

方式 抛出异常 有返回值,不抛出异常 阻塞 等待 超时等待
添加 add offer() put() offer(,)
移除 remove poll() take() poll(,)
检测队首元素 element peek - -
/**
 * 抛出异常
 */
public static void test1() {

   ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);

   System.out.println(blockingQueue.add("a"));
   System.out.println(blockingQueue.add("b"));
   System.out.println(blockingQueue.add("c"));
   // IllegalStateException: Queue full 抛出异常!
   // System.out.println(blockingQueue.add("d"));

   System.out.println("=-===========");

   System.out.println(blockingQueue.remove());
   System.out.println(blockingQueue.remove());
   System.out.println(blockingQueue.remove());
   // java.util.NoSuchElementException 抛出异常!
   // System.out.println(blockingQueue.remove());
}
/**
 * 有返回值,没有异常
 */
public static void test2() {
   // 队列的大小
   ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
   System.out.println(blockingQueue.offer("a"));
   System.out.println(blockingQueue.offer("b"));
   System.out.println(blockingQueue.offer("c"));
   // System.out.println(blockingQueue.offer("d")); // false 不抛出异常
   // !
   System.out.println("============================");
   
   System.out.println(blockingQueue.poll());
   System.out.println(blockingQueue.poll());
   System.out.println(blockingQueue.poll());
   System.out.println(blockingQueue.poll()); // null 不抛出异常!
}
/**
 * 等待,阻塞(等待超时)
 */
public static void test4() throws InterruptedException {
   // 队列的大小
   ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
   blockingQueue.offer("a");
   blockingQueue.offer("b");
   blockingQueue.offer("c");
   // blockingQueue.offer("d",2,TimeUnit.SECONDS); // 等待超过2秒就退出
   
   System.out.println("===============");
   
   System.out.println(blockingQueue.poll());
   System.out.println(blockingQueue.poll());
   System.out.println(blockingQueue.poll());
   blockingQueue.poll(2, TimeUnit.SECONDS); // 等待超过2秒就退出
}

SynchronousQueue 同步队列

没有容量,

进去一个元素,必须等待取出来之后,才能再往里面放一个元素!

put、take

/**
 * 同步队列
 * 和其他的BlockingQueue 不一样,SynchronousQueue 不存储元素
 * put了一个元素,必须从里面先take取出来,否则不能再put进去值
 */
public class SynchronousQueueDemo {
   public static void main(String[] args) {
      SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); //同步队列

      new Thread(() -> {

         try {
            System.out.println(Thread.currentThread().getName() + " put 1");
            synchronousQueue.put("1");
            System.out.println(Thread.currentThread().getName() + " put 2");
            synchronousQueue.put("2");
            System.out.println(Thread.currentThread().getName() + " put 3");
            synchronousQueue.put("3");
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }, "T1").start();

      new Thread(() -> {
         try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + "=>" + synchronousQueue.take());
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + "=>" + synchronousQueue.take());
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + "=>" + synchronousQueue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }, "T2").start();
   }
}

JUC并发编程

11、线程池(重点)

线程池:三大方法、7大参数、4种拒绝策略

11.1、池化技术

程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术

线程池、连接池、内存池、对象池///… 创建、销毁。十分浪费资源

池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。

线程池的好处:

1、降低资源的消耗

2、提高响应的速度

3、方便管理。

线程复用、可以控制最大并发数、管理线程

11.2、线程池:三大方法

JUC并发编程

// Executors 工具类、3大方法
public class Demo01 {
   public static void main(String[] args) {
      ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
//    ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
//    ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱

      try {
         for (int i = 0; i < 100; i++) {
            // 使用了线程池之后,使用线程池来创建线程
            threadPool.execute(() -> {
               System.out.println(Thread.currentThread().getName() + " ok");
            });
         }
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         // 线程池用完,程序结束,关闭线程池
         threadPool.shutdown();
      }
   }
}

11.3、7大参数

源码分析

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

本质:ThreadPoolExecutor
    
    /**
	 * @param corePoolSize    // 核心线程池大小
	 * @param maximumPoolSize // 最大核心线程池大小
	 * @param keepAliveTime   // 超市了没有人调用就会释放
	 * @param unit            // 超市单位
	 * @param workQueue       // 阻塞队列
	 * @param threadFactory   // 线程工厂:创建线程的,一般不用动
	 * @param handler         // 拒绝策略
	 */
    public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
            if (var6 != null && var7 != null && var8 != null) {
                this.corePoolSize = var1;
                this.maximumPoolSize = var2;
                this.workQueue = var6;
                this.keepAliveTime = var5.toNanos(var3);
                this.threadFactory = var7;
                this.handler = var8;
            } else {
                throw new NullPointerException();
            }
        } else {
            throw new IllegalArgumentException();
        }
    }

JUC并发编程

JUC并发编程

手动创建线程池

/**
 * 手动创建线程池
 * new ThreadPoolExecutor.AbortPolicy()        // 银行满了,还有人进来,不处理这个人的,抛出异常
 * new ThreadPoolExecutor.CallerRunsPolicy()    // 哪来的去哪里!
 * new ThreadPoolExecutor.DiscardPolicy()       // 队列满了,丢掉任务,不会抛出异常!
 * new ThreadPoolExecutor.DiscardOldestPolicy()  // 队列满了,尝试去和最早的竞争,也不会抛出异常!
 */
public class Demo02 {
   public static void main(String[] args) {
      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2,
            5,
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
      );

      try {
         // 最大承载:Deque + max
         // 超过 RejectedExecutionException
         for (int i = 1; i <= 8; i++) {
            // 使用了线程池之后,使用线程池来创建线程
            threadPool.execute(() -> {
               System.out.println(Thread.currentThread().getName() + " ok");
            });
         }
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         // 线程池用完,程序结束,关闭线程池
         threadPool.shutdown();
      }
   }
}

11.4、4种拒绝策略

JUC并发编程

/**
 * 
 * new ThreadPoolExecutor.AbortPolicy()        // 银行满了,还有人进来,不处理这个人的,抛出异常
 * new ThreadPoolExecutor.CallerRunsPolicy()    // 哪来的去哪里!
 * new ThreadPoolExecutor.DiscardPolicy()       // 队列满了,丢掉任务,不会抛出异常!
 * new ThreadPoolExecutor.DiscardOldestPolicy()  // 队列满了,尝试去和最早的竞争,也不会抛出异常!
 */

11.5、小结和拓展

池的最大的大小如何去设置!

了解:IO密集型,CPU密集型:(调优)

public class Demo02 {
   public static void main(String[] args) {
      // 自定义线程池!工作 ThreadPoolExecutor

      // 最大线程到底该如何定义
      // 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!
      // 2、IO 密集型 > 判断你程序中十分耗IO的线程,
      // 程序 15个大型任务 io十分占用资源!

      // 获取cpu的核数
      System.out.println(Runtime.getRuntime().availableProcessors());

      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2,
            Runtime.getRuntime().availableProcessors(),
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
      );

      try {
         // 最大承载:Deque + max
         // 超过 RejectedExecutionException
         for (int i = 1; i <= 8; i++) {
            // 使用了线程池之后,使用线程池来创建线程
            threadPool.execute(() -> {
               System.out.println(Thread.currentThread().getName() + " ok");
            });
         }
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         // 线程池用完,程序结束,关闭线程池
         threadPool.shutdown();
      }
   }
}

12、四大函数式接口(必需掌握)

新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算

12.1、函数式接口:

只有一个方法的接口

@FunctionalInterface
public interface Runnable {
	public abstract void run();
}

// 泛型、枚举、反射
// lambda表达式、链式编程、函数式接口、Stream流式计算
// 超级多FunctionalInterface
// 简化编程模型,在新版本的框架底层大量应用!
// foreach(消费者类型的函数式接口)

JUC并发编程

代码测试:

Function函数式接口

JUC并发编程

/**
 * function 函数型接口,有一个输入参数,有一个返回类型
 * 只要是 函数型接口 可以 用 lambda表达式简化
 */
public class Demo01 {
   public static void main(String[] args) {

//    Function<String, String> function = new Function<String, String>() {
//       @Override
//       public String apply(String str) {
//          return str;
//       }
//    };

//    Function<String, String> function = (Function<String, String>) str -> {
//       return str;
//    };

//    Function<String, String> function = (str) -> {
//       return str;
//    };

      Function<String, String> function = str -> str;

      System.out.println(function.apply("asd"));
   }
}

12.2、断定型接口:

有一个输入参数,返回值只能是 布尔值!

JUC并发编程

/**
 * 断定型接口:有一个输入参数,返回值只能时 布尔值!
 */
public class Demo02 {
   public static void main(String[] args) {
      // 判断字符串是否为空
//    Predicate<String> predicate = new Predicate<String>() {
//       @Override
//       public boolean test(String str) {
//          return str.isEmpty();
//       }
//    };

      Predicate<String> predicate = str -> str.isEmpty();

      System.out.println(predicate.test(""));
   }
}

12.3、Consumer 消费型接口

只有输入,没有返回值

JUC并发编程

/**
 * Consumer 消费型接口: 只有输入,没有返回值
 */
public class Demo03 {
   public static void main(String[] args) {
//    Consumer<String> consumer = new Consumer<String>() {
//       @Override
//       public void accept(String str) {
//          System.out.println(str);
//       }
//    };

      Consumer<String> consumer = str -> System.out.println(str);

      consumer.accept("consumer");
   }
}

12.4、Supplier 供给型接口

没有参数,只有返回值

JUC并发编程

/**
 * Supplier 供给型接口 没有参数,只有返回值
 */
public class Demo04 {
   public static void main(String[] args) {
//    Supplier<String> supplier = new Supplier<String>() {
//       @Override
//       public String get() {
//          return "1024";
//       }
//    };

      Supplier<String> supplier = () -> "1024";

      System.out.println(supplier.get());
   }
}

13、Stream流式计算

什么是Stream流式计算

Stream(流)是一个来自数据源的元素队列并支持聚合操作

大数据时代:存储 + 计算

集合、MySQL 本质就是存储东西的;

计算都应该交给流来操作!

JUC并发编程

/**
 * 题目要求:一分钟内完成此题,只能用一行代码实现!
 * 现在有5个用户!筛选:
 * 1、ID 必须是偶数
 * 2、年龄必须大于23岁
 * 3、用户名转为大写字母
 * 4、用户名字母倒着排序
 * 5、只输出一个用户!
 */
public class Test {
	public static void main(String[] args) {
		User u1 = new User(1, "a", 21);
		User u2 = new User(2, "b", 22);
		User u3 = new User(3, "c", 23);
		User u4 = new User(4, "d", 24);
		User u5 = new User(5, "e", 25);
		// 集合就是存储
		List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

		// 计算交给stream流
		// lambda表达式、链式编程、函数式接口、Stream流式计算
		
//		list.stream()
//				.filter(u -> u.getId() % 2 == 0)
//				.filter(u -> u.getAge() > 23)
//				.peek(user -> user.setName(user.getName().toUpperCase()))
//				.sorted((user1, user2) -> user1.getName().compareTo(user2.getName()))
//				.limit(1)
//				.forEach(System.out::println);
		
		list.stream()
				.filter(u -> u.getId() % 2 == 0)
				.filter(u -> u.getAge() > 23)
				.peek(user -> user.setName(user.getName().toUpperCase()))
				.sorted(Comparator.comparing(User::getName))
				.limit(1)
				.forEach(System.out::println);

	}
}

14、ForkJoin

什么是 ForkJoin

ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!

大数据:Map Reduce (把大任务拆分为小任务)

JUC并发编程

ForkJoin 特点:工作窃取

这个里面维护的都是双端队列

JUC并发编程

ForkJoin

JUC并发编程

JUC并发编程

/**
 * 求和计算的任务!
 * 3000 6000(ForkJoin) 9000(Stream并行流)
 * // 如何使用 forkjoin
 * // 1、forkjoinPool 通过它来执行
 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
 * // 3. 计算类要继承 ForkJoinTask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

   private Long start;
   private Long end;

   // 临界值
   private Long temp = 10000L;

   public ForkJoinDemo(Long start, Long end) {
      this.start = start;
      this.end = end;
   }

   // 计算方法
   @Override
   protected Long compute() {
      if ((end - start) > 0) {
         Long sum = 0L;
         for (Long i = start; i <= end; i++) {
            sum += i;
         }
         return sum;
      } else { // forkjoin 递归
         long middle = (start + end) / 2; // 中间值
         ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
         task1.fork(); // 拆分任务,把任务压入线程队列
         ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
         task2.fork(); // 拆分任务,把任务压入线程队列
         return task1.join() + task2.join();
      }

   }
}

测试

/**
 * 同一个任务,别人效率高你几十倍!
 */
public class Test {
   public static void main(String[] args) throws ExecutionException, InterruptedException {
      test1(); // 5650
      test2(); // 4971
      test3(); // 153
   }

   // 普通程序员
   public static void test1() {
      Long sum = 0L;
      long start = System.currentTimeMillis();
      for (Long i = 1L; i <= 10_0000_0000; i++) {
         sum += i;
      }
      long end = System.currentTimeMillis();
      System.out.println("sum=" + sum + " 时间:" + (end - start));
   }

   // 会使用ForkJoin
   public static void test2() throws ExecutionException, InterruptedException {
      long start = System.currentTimeMillis();
      ForkJoinPool forkJoinPool = new ForkJoinPool();
      ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
      ForkJoinTask<Long> submit = forkJoinPool.submit(task);
      Long sum = submit.get();
      long end = System.currentTimeMillis();

      System.out.println("sum=" + sum + " 时间:" + (end - start));
   }

   public static void test3() {
      long start = System.currentTimeMillis();
      // Stream并行流 () (]
      long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
      long end = System.currentTimeMillis();

      System.out.println("sum=" + sum + " 时间:" + (end - start));
   }

JUC并发编程

15、异步回调

Future 设计的初衷: 对将来的某个事件的结果进行建模

JUC并发编程

/**
 * 异步调用: CompletableFuture
 * // 异步执行
 * // 成功回调
 * // 失败回调
 */
public class Demo01 {
   public static void main(String[] args) throws ExecutionException, InterruptedException {
      // 没有返回值的 runAsync 异步回调
//    CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
//       try {
//          TimeUnit.SECONDS.sleep(3);
//       } catch (InterruptedException e) {
//          e.printStackTrace();
//       }
//       System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
//    });
//
//    System.out.println("111111");
//    completableFuture.get();   // 获取阻塞执行结果

      // 有返回值的 supplyAsync 异步回调
      // ajax,成功和失败的回调
      // 返回的是错误信息;
      CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> {
         System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
         int i = 10 / 0;
         return 1024;
      });

      System.out.println(completableFuture.whenComplete((t, u) -> {
         System.out.println("t=>" + t);  // 正常的返回结果
         System.out.println("u=>" + u);  // 错误信息 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
      }).exceptionally((e) -> {
         e.printStackTrace();
         return 233;  // 可以获取到错误的返回信息
      }).get());
   }
}

16、JMM

请你谈谈你对 Volatile 的理解

Volatile 是 Java 虚拟机提供的轻量级的同步机制

1、保证可见性

2、不保证原子性

3、禁止指令重排

什么是JMM

JMM(Java memory model): Java内存模型。不存在的东西,概念!约定!

关于JMM的一些同步的约定:

1、线程解锁前,必须把共享变量 立刻 刷回主存。

2、线程加锁前,必须读取主存中的最新值到工作内存中!

3、加锁和解锁是同一把锁

线程 工作内存 、主内存

8种操作:

JUC并发编程

JUC并发编程

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

  • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
  • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
  • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
  • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
  • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
  • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
  • write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

JMM对这八种指令的使用,制定了如下规则:

  • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
  • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
  • 不允许一个线程将没有assign的数据从工作内存同步回主内存
  • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
  • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
  • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
  • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
    对一个变量进行unlock操作之前,必须把此变量同步回主内存

问题: 程序不知道主内存的值已经被修改过了

public class JMMDemo {
   private static int num = 0;
   public static void main(String[] args) throws InterruptedException {
      new Thread(() -> {
         while (num == 0) {

         }
      }).start();
      num = 1;
      TimeUnit.SECONDS.sleep(3);
      System.out.println(num);
   }
}

输出结果了,但是程序没有停

JUC并发编程

17、Volatile

17.1、保证可见性

public class JMMDemo {
   // 不加 volatile 程序就会死循环
   // 加 volatile 可以保证可见性
   private volatile static int num = 0;

   public static void main(String[] args) throws InterruptedException {
      new Thread(() -> { // 线程1 对主内存的变化不知道的
         while (num == 0) {

         }
      }).start();
      num = 1;
      TimeUnit.SECONDS.sleep(3);
      System.out.println(num);
   }
}

17.2、不保证原子性

原子性 : 不可分割

线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。

// volatile 不保证原子性
public class VDemo02 {

   // volatile 不保证原子性
   private volatile static int num = 0;

   public static void add() {
      num++;  // 不是一个原子性操作
   }

   public static void main(String[] args) {
      for (int i = 0; i < 20; i++) {
         new Thread(() -> {
            for (int j = 0; j < 1000; j++) {
               add();
            }
         }).start();
      }

      while (Thread.activeCount() > 2) {  // main gc
         Thread.yield();
      }

      System.out.println(Thread.currentThread().getName() + " " + num);
   }
}

如果不加 lock 和 synchronized ,怎么样保证原子性

JUC并发编程

使用原子类,解决 原子性问题

JUC并发编程

public class VDemo03 {
   // volatile 不保证原子性
   // 原子类的 Integer
   private volatile static AtomicInteger num = new AtomicInteger();

   public static void add() {
//    num++;  // 不是一个原子性操作
      num.getAndIncrement(); // AtomicInteger + 1 方法, CAS
   }

   public static void main(String[] args) {
      for (int i = 0; i < 20; i++) {
         new Thread(() -> {
            for (int j = 0; j < 1000; j++) {
               add();
            }
         }).start();
      }

      while (Thread.activeCount() > 2) {  // main gc
         Thread.yield();
      }

      System.out.println(Thread.currentThread().getName() + " " + num);
   }
}

这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!

17.3、指令重排

什么是 指令重排:你写的程序,计算机并不是按照你写的那样去执行的。

源代码–>编译器优化的重排–> 指令并行也可能会重排–> 内存系统也会重排—> 执行

处理器在进行指令重排的时候,考虑:数据之间的依赖性!

int x = 1; // 1
int y = 2; // 2
x = x + 5; // 3
y = x * x; // 4

我们所期望的:1234 但是可能执行的时候回变成 2134 1324
可不可能是 4123

可能造成影响的结果: a b x y 这四个值默认都是 0;

线程A 线程B
x=a y=b
b=1 a=2

正常的结果: x = 0;y = 0;但是可能由于指令重排

线程A 线程B
b=1 a=2
x=a y=b

指令重排导致的诡异结果: x = 2;y = 1;

非计算机专业

volatile可以避免指令重排:

内存屏障。CPU指令。作用:

1、保证特定的操作的执行顺序!

2、可以保证某些变量的内存可见性 (利用这些特性volatile实现了可见性)

JUC并发编程

Volatile 是可以保持 可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!

18、彻底玩转单例模式

饿汉式 DCL懒汉式 ,探究!

饿汉式

// 饿汉式单例
public class Hungry {

   // 可能会浪费空间
   private byte[] data1 = new byte[1024 * 1024];
   private byte[] data2 = new byte[1024 * 1024];
   private byte[] data3 = new byte[1024 * 1024];
   private byte[] data4 = new byte[1024 * 1024];

   private Hungry() {

   }

   private final static Hungry HUNGRY = new Hungry();

   public static Hungry getInstance() {
      return HUNGRY;
   }
}

懒汉式单例

//懒汉式单例
// 道高一尺魔高一丈
public class LazyMan {

   private static boolean qinjiang = false;

   public LazyMan() {
      synchronized (LazyMan.class) {
         if (qinjiang == false) {
            qinjiang = true;
         } else {
            throw new RuntimeException("不要试图使用反射破坏异常");
         }
      }
   }

   private volatile static LazyMan lazyMan;

   // 双重检测锁模式的 懒汉式单例 DCL 懒汉式
   public static LazyMan getInstance() {
      if (lazyMan == null) {
         synchronized (LazyMan.class) {
            if (lazyMan == null) {
               lazyMan = new LazyMan(); // 不是一个原子性操作
               /**
                * 1. 分配内存空间
                * 2. 执行构造方法,初始化对象
                * 3. 把这个对象指向这个空间
                */
            }
         }
      }
      return lazyMan;
   }

   // 反射!
   public static void main(String[] args) throws Exception {
//    LazyMan instance = LazyMan.getInstance();

      Field qinjiang = LazyMan.class.getDeclaredField("qinjiang");
      qinjiang.setAccessible(true);


      Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
      declaredConstructor.setAccessible(true);
      LazyMan instance = declaredConstructor.newInstance();

      qinjiang.set(instance, false);

      LazyMan instance2 = declaredConstructor.newInstance();

      System.out.println(instance);
      System.out.println(instance2);
   }
}

静态内部类

public class Holder {

   private Holder() {
      System.out.println(Thread.currentThread().getName() + "ok");
   }

   public static Holder getInstance() {
      return InnerClass.HOLDER;
   }

   public static class InnerClass {
      private static final Holder HOLDER = new Holder();
   }
}

单例不安全,有反射

枚举

// enum 是一个什么? 本身也是一个Class类
public enum EnumSingle {

   INSTANCE;

   public EnumSingle getInstance() {
      return INSTANCE;
   }

}

class Test {
   public static void main(String[] args) throws Exception {
      EnumSingle instance1 = EnumSingle.INSTANCE;

      Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);
      declaredConstructor.setAccessible(true);
      EnumSingle instance2 = declaredConstructor.newInstance();

      System.out.println(instance1);
      // NoSuchMethodException: com.kuang.single.EnumSingle.<init>()
      System.out.println(instance2);
   }
}

JUC并发编程

枚举类型的最终反编译源码:

// Decompiled by Jad v1.5.8g. Copyright 2001 Pavel Kouznetsov.
// Jad home page: http://www.kpdus.com/jad.html
// Decompiler options: packimports(3)
// Source File Name: EnumSingle.java
package com.kuang.single;
public final class EnumSingle extends Enum
{
    public static EnumSingle[] values()
    {
        return (EnumSingle[])$VALUES.clone();
    }
    public static EnumSingle valueOf(String name)
    {
        return (EnumSingle)Enum.valueOf(com/kuang/single/EnumSingle, name);
    }
    private EnumSingle(String s, int i)
    {
        super(s, i);
    }
    public EnumSingle getInstance()
    {
        return INSTANCE;
    }
    public static final EnumSingle INSTANCE;
    private static final EnumSingle $VALUES[];
    static
    {
        INSTANCE = new EnumSingle("INSTANCE", 0);
        $VALUES = (new EnumSingle[] {
            INSTANCE
        });
    }
}

19、深入理解CAS

19.1、什么是 CAS

大厂你必须要深入研究底层!有所突破! 修内功,操作系统,计算机网络原理

public class CASDemo {
   public static void main(String[] args) {
      AtomicInteger atomicInteger = new AtomicInteger(2020);

      // 期望、更新
      // public final boolean compareAndSet(int expect, int update)
      // 如果我期望的值达到了,那么就更新,否则,就不更新, CAS 是CPU的并发原语!
      System.out.println(atomicInteger.compareAndSet(2020, 2021));
      System.out.println(atomicInteger.get());
      atomicInteger.getAndIncrement();
      System.out.println(atomicInteger.compareAndSet(2020, 2021));
      System.out.println(atomicInteger.get());
   }
}

19.2、Unsafe 类

JUC并发编程

JUC并发编程

JUC并发编程

CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!

缺点:

1、 循环会耗时

2、一次性只能保证一个共享变量的原子性

3、ABA问题

19.3、CAS : ABA 问题(狸猫换太子)

JUC并发编程

public class CASDemo {
   public static void main(String[] args) {
      AtomicInteger atomicInteger = new AtomicInteger(2020);

      // 期望、更新
      // public final boolean compareAndSet(int expect, int update)
      // 如果我期望的值达到了,那么就更新,否则,就不更新, CAS 是CPU的并发原语!
      // ============== 捣乱的线程 ==================
      System.out.println(atomicInteger.compareAndSet(2020, 2021));
      System.out.println(atomicInteger.get());

      System.out.println(atomicInteger.compareAndSet(2021, 2020));
      System.out.println(atomicInteger.get());

      // ============== 期望的线程 ==================
      System.out.println(atomicInteger.compareAndSet(2020, 6666));
      System.out.println(atomicInteger.get());
   }
}

20、原子引用

解决ABA 问题,引入原子引用! 对应的思想:乐观锁!

带版本号 的原子操作!

public class CASDemo {
   public static void main(String[] args) {
      // AtomicStampedReference  注意,如果泛型是一个包装类,注意对象的引用问题
       
       // 正常在业务操作中,这里面比较的都是一个个对象
      AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);

      new Thread(() -> {
         int stamp = atomicStampedReference.getStamp();  // 获得版本号
         System.out.println("a1=>" + stamp);

         try {
            TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }

         System.out.println(atomicStampedReference.compareAndSet(1, 2,
               atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
         System.out.println("a2=>" + stamp);
         System.out.println(atomicStampedReference.compareAndSet(2, 1,
               atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
         System.out.println("a3=>" + atomicStampedReference.getStamp());
      }, "a").start();

      // 乐观锁的原理相同
      new Thread(() -> {
         int stamp = atomicStampedReference.getStamp();  // 获得版本号
         System.out.println("b1=>" + atomicStampedReference.getStamp());

         try {
            TimeUnit.SECONDS.sleep(2);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }

         System.out.println(atomicStampedReference.compareAndSet(1, 6, stamp, stamp + 1));
         System.out.println("b2=>" + atomicStampedReference.getStamp());
      }, "b").start();
   }
}

注意:

Integer 使用了对象缓存机制,默认范围是 -128 ~ 127 ,推荐使用静态工厂方法 valueOf 获取对象实例,而不是 new,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间;

JUC并发编程

21、各种锁的理解

21.1、公平锁、非公平锁

公平锁: 非常公平, 不能够插队,必须先来后到!

非公平锁:非常不公平,可以插队 (默认都是非公平)

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

21.2、可重入锁

可重入锁(递归锁)

JUC并发编程

Synchronized

public class Demo01 {
   public static void main(String[] args) {
      Phone phone = new Phone();

      new Thread(() -> {
         phone.sms();
      }, "A").start();

      new Thread(() -> {
         phone.sms();
      }, "B").start();
   }
}

class Phone {
   public synchronized void sms() {
      System.out.println(Thread.currentThread().getName() + "sms");
      call(); // 这里也有锁
   }

   public synchronized void call() {
      System.out.println(Thread.currentThread().getName() + "call");
   }
}

JUC并发编程

Lock 版

public class Demo02 {
   public static void main(String[] args) {
      Phone2 phone = new Phone2();

      new Thread(() -> {
         phone.sms();
      }, "A").start();

      new Thread(() -> {
         phone.sms();
      }, "B").start();
   }
}

class Phone2 {
   Lock lock = new ReentrantLock();

   public void sms() {
      lock.lock(); // 细节问题:lock.lock(); lock.unlock(); // lock 锁必须配对,否则就会死在里面
      lock.lock();
      try {
         System.out.println(Thread.currentThread().getName() + "sms");
         call(); // 这里也有锁
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         lock.unlock();
         lock.unlock();
      }

   }

   public void call() {
      lock.lock();
      try {
         System.out.println(Thread.currentThread().getName() + "call");
      } catch (Exception e) {
         call(); // 这里也有锁
         e.printStackTrace();
      } finally {
         lock.unlock();
      }
   }
}

JUC并发编程

21.3、自旋锁

spinlock

JUC并发编程

我们来自定义一个锁测试

/**
 * 自旋锁
 */
public class SpinlockDemo {

   AtomicReference<Thread> atomicReference = new AtomicReference<>();

   // 加锁
   public void myLock() {
      Thread thread = Thread.currentThread();
      System.out.println(Thread.currentThread().getName() + "==> myLock");

      // 自旋锁
      while (!atomicReference.compareAndSet(null, thread)) {

      }
   }

   // 解锁
   public void myUnLock() {
      Thread thread = Thread.currentThread();
      System.out.println(Thread.currentThread().getName() + "==> myUnLock");
      atomicReference.compareAndSet(thread, null);
   }
}
public class TestSpinLock {

   public static void main(String[] args) {
//    Lock lock = new ReentrantLock();
//    lock.lock();
//    lock.unlock();

      // 底层使用的自旋锁CAS
      SpinlockDemo lock = new SpinlockDemo();

      new Thread(() -> {
         lock.myLock();
         try {
            TimeUnit.SECONDS.sleep(3);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            lock.myUnLock();
         }
      }, "T1").start();

      new Thread(() -> {
         lock.myLock();
         try {
            TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            lock.myUnLock();
         }
      }, "T2").start();
   }
}

JUC并发编程

21.4、死锁

死锁是什么

JUC并发编程

死锁测试,怎么排除死锁:

public class DeadLockDemo {

   public static void main(String[] args) {
      String lockA = "lockA";
      String lockB = "lockB";
      
      new Thread(new MyThread(lockA, lockB), "T1").start();
      new Thread(new MyThread(lockB, lockA), "T1").start();
   }
}

class MyThread implements Runnable {

   private String lockA;
   private String lockB;

   public MyThread(String lockA, String lockB) {
      this.lockA = lockA;
      this.lockB = lockB;
   }

   @Override
   public void run() {
      synchronized (lockA) {
         System.out.println(Thread.currentThread().getName() + "lock:" + lockA + "=>get" + lockB);

         try {
            TimeUnit.SECONDS.sleep(2);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         
         synchronized (lockB) {
            System.out.println(Thread.currentThread().getName() + "lock:" + lockB + "=>get" + lockA);
         }
      }
   }
}

解决问题

1、使用 jps -l 定位进程号

JUC并发编程

2、使用 jstack 进程号 找到死锁问题

JUC并发编程

面试,工作中! 排查问题:

1、日志 9

2、堆栈 1

版权声明:程序员胖胖胖虎阿 发表于 2022年11月6日 上午1:56。
转载请注明:JUC并发编程 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...