Java Thread

Java Thread,第1张

Java Thread 一、进程和线程

进程: 一个运行的程序

线程: 进程中的一个执行任务

一个进程中至少有一个线程,也可以运行多个线程

并行: 同一时刻多任务交替执行(单核CPU)

并发: 同一时刻多任务同时执行(多核CPU)

二、原始创建线程方式

使用**jconsole**命令观察线程状态

1.继承Thread类,重写run()方法
class Test01 extends Thread{

    int count = 0;

    @Override
    public void run() {
        System.out.println("线程" + Thread.currentThread().getName() + "开始!");
        while(true)
        {
            System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(count == 10) {
                System.out.println("线程" + Thread.currentThread().getName() + "结束!");
                break;
            }
        }
    }
}
public static void main(String[] args) {
    /*
        *   public synchronized void start() {
        *       start0();    --->真正创建线程函数
        *   }
        *
        *   private native void start0(); 本地方法 有JVM调用
        */
    System.out.println("线程" + Thread.currentThread().getName() + "开始!");
    Test01 test = new Test01();
    //test.run();//!!!不能直接调用run() --->没有创建新的线程,由主线程完成,整个程序串行执行
    test.start();
    for (int i = 0; i < 10; i++) {
        System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (i + 1));
    }
    System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}

/*
某次运行结果:
线程main开始!
线程main正在运行 1
线程main正在运行 2
线程main正在运行 3
线程main正在运行 4
线程main正在运行 5
线程main正在运行 6
线程Thread-0开始!
线程Thread-0正在运行 1
线程main正在运行 7
线程main正在运行 8
线程main正在运行 9
线程main正在运行 10
线程main结束!
线程Thread-0正在运行 2
线程Thread-0正在运行 3
线程Thread-0正在运行 4
线程Thread-0正在运行 5
线程Thread-0正在运行 6
线程Thread-0正在运行 7
线程Thread-0正在运行 8
线程Thread-0正在运行 9
线程Thread-0正在运行 10
线程Thread-0结束!
*/

注意: Java不能真正创建线程,底层由C++创建

2.实现Runnable接口,放入Thread对象中
class Test02 implements Runnable{

    int count = 0;

    @Override
    public void run() {
        System.out.println("线程" + Thread.currentThread().getName() + "开始!");
        while(true)
        {
            System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(count == 10) {
                System.out.println("线程" + Thread.currentThread().getName() + "结束!");
                break;
            }
        }
    }
}
public static void main(String[] args) {
    System.out.println("线程" + Thread.currentThread().getName() + "开始!");
    Test02 test = new Test02();
    //test.start(); //test类没有start()
    //使用代理模式 ---> thread类帮你调用start()创建一个线程
    Thread thread = new Thread(test);
    thread.start();
    for (int i = 0; i < 10; i++) {
        System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (i + 1));
    }
    System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}

/*
某次运行结果:
线程main开始!
线程main正在运行 1
线程main正在运行 2
线程main正在运行 3
线程main正在运行 4
线程main正在运行 5
线程main正在运行 6
线程main正在运行 7
线程main正在运行 8
线程main正在运行 9
线程main正在运行 10
线程Thread-0开始!
线程main结束!
线程Thread-0正在运行 1
线程Thread-0正在运行 2
线程Thread-0正在运行 3
线程Thread-0正在运行 4
线程Thread-0正在运行 5
线程Thread-0正在运行 6
线程Thread-0正在运行 7
线程Thread-0正在运行 8
线程Thread-0正在运行 9
线程Thread-0正在运行 10
线程Thread-0结束!
*/
三、线程 *** 作 1.通知线程
class Test03 implements Runnable{

    private int count;

    private boolean loop = true;

    public void setLoop(boolean loop) {
        this.loop = loop;
    }

    @Override
    public void run() {
        System.out.println("线程" + Thread.currentThread().getName() + "开始!");
        while(loop)
        {
            System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
2.中断线程休眠

使用Thread.interrupt()方法中断线程休眠

class Test04 implements Runnable{

    private int count;

    private int second = 20;

    @Override
    public void run() {

        System.out.println("线程" + Thread.currentThread().getName() + "开始!");

        for (int i = 0; i < 5; i++) {
            System.out.println("线程" + Thread.currentThread().getName() + " " + (++count));
        }

        try {
            while (second > 0) {
                //休眠20s
                System.out.println("线程" + Thread.currentThread().getName() + "休眠" + (second--) + "s");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            System.out.println("线程" + Thread.currentThread().getName() + "中断休眠");
        }


        for (int i = 0; i < 5; i++) {
            System.out.println("线程" + Thread.currentThread().getName() + " " + (++count));
        }
        
        System.out.println("线程" + Thread.currentThread().getName() + "结束!");
    }
}
public static void main(String[] args) throws InterruptedException {
    Test04 test04 = new Test04();
    Thread thread = new Thread(test04, "4");
    thread.start();

    //3s后中断线程4休眠
    Thread.sleep(3000);
    thread.interrupt();
}

/*
线程4开始!
线程4 1
线程4 2
线程4 3
线程4 4
线程4 5
线程4休眠20s
线程4休眠19s
线程4休眠18s
线程4休眠17s
线程4中断休眠
线程4 6
线程4 7
线程4 8
线程4 9
线程4 10
线程4结束!
*/
3.线程插队
class Test05 implements Runnable{

    int count = 0;

    @Override
    public void run() {
        System.out.println("线程" + Thread.currentThread().getName() + "开始!");
        while(true)
        {
            System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(count == 10) {
                System.out.println("线程" + Thread.currentThread().getName() + "结束!");
                break;
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Test05 test05 = new Test05();
    Thread A = new Thread(test05, "A");
    A.start();

    for (int i = 1; i <= 10; i++) {
        System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + i);
        //让A线程执行完毕后,再执行Main线程
        if(i == 5)
        {
            //A.join(); //插队 一定成功
            Thread.yield(); //礼让A线程执行,不一定成功
        }
    }
}

/*
join():
线程main正在运行 1
线程A开始!
线程main正在运行 2
线程main正在运行 3
线程main正在运行 4
线程main正在运行 5
线程A正在运行 1
线程A正在运行 2
线程A正在运行 3
线程A正在运行 4
线程A正在运行 5
线程A正在运行 6
线程A正在运行 7
线程A正在运行 8
线程A正在运行 9
线程A正在运行 10
线程A结束!
线程main正在运行 6
线程main正在运行 7
线程main正在运行 8
线程main正在运行 9
线程main正在运行 10
-----------------
yield():
线程main正在运行 1
线程A开始!
线程main正在运行 2
线程A正在运行 1
线程main正在运行 3
线程main正在运行 4
线程main正在运行 5
线程main正在运行 6
线程main正在运行 7
线程main正在运行 8
线程main正在运行 9
线程main正在运行 10
线程A正在运行 2
线程A正在运行 3
线程A正在运行 4
线程A正在运行 5
线程A正在运行 6
线程A正在运行 7
线程A正在运行 8
线程A正在运行 9
线程A正在运行 10
线程A结束!
*/
4.守护线程

用户线程: 工作线程

守护线程: 为工作线程服务,当所有工作线程结束,守护线程自动结束,例如垃圾回收机制 gc

class Test06 implements Runnable{

    private int count;

    @Override
    public void run() {
        System.out.println("线程" + Thread.currentThread().getName() + "开始!");
        //线程无限循环执行
        while(true)
        {
            System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) {
    Test06 test06 = new Test06();
    Thread thread = new Thread(test06, "Daemon");
    //将Daemon线程设置为守护线程
    thread.setDaemon(true);
    thread.start();

    System.out.println("线程" + Thread.currentThread().getName() + "开始!");
    for (int i = 1; i < 10; i++) {
        System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}

/*
线程main开始!
线程Daemon开始!
线程main正在运行 1
线程Daemon正在运行 1
线程main正在运行 2
线程Daemon正在运行 2
线程main正在运行 3
线程Daemon正在运行 3
线程main正在运行 4
线程Daemon正在运行 4
线程Daemon正在运行 5
线程main正在运行 5
线程main正在运行 6
线程Daemon正在运行 6
线程main正在运行 7
线程Daemon正在运行 7
线程main正在运行 8
线程Daemon正在运行 8
线程main正在运行 9
线程Daemon正在运行 9
线程main结束!
线程Daemon正在运行 10
*/
四、线程六大状态
public enum State {
    
    //创建
    NEW,

    //可运行
    RUNNABLE,

    //阻塞
    BLOCKED,

    //等待
    WAITING,

    //超时等待
    TIMED_WAITING,

    //终止
    TERMINATED;
}

五、线程同步

同步: 在同一时刻,对于同一数据最多只用一个线程访问

1.Synchronized

1.同步代码块

//对象锁
synchronized(对象) {
	//同步代码块
}

2.同步方法

private synchronized void f(){}

执行顺序问题:

1.同一个对象调用两个普通同步方法执行顺序

    public static void main(String[] args) {

        ThreadTest threadTest = new ThreadTest();

        new Thread(()->{
            threadTest.f1();
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            threadTest.f2();
        }).start();
    }
class ThreadTest{

    public synchronized void f1(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("f1()执行...");
    }

    public synchronized void f2(){
        System.out.println("f2()执行...");
    }
}

/*
Synchronized修饰普通方法时,锁的对象是方法的调用者(本例中为threadTest)
本例中f1()、f2()方法被同一个对象调用,谁先调用,谁先执行
延时3s
f1()执行...
延时1s
f2()执行...
*/

2.两个不同对象调用两个static同步方法执行顺序

    public static void main(String[] args) {

        ThreadTest threadTest1 = new ThreadTest();
        ThreadTest threadTest2 = new ThreadTest();

        new Thread(()->{
            threadTest1.f1();
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            threadTest2.f2();
        }).start();
    }
}
class ThreadTest{

    public static synchronized void f1(){
        try {
            TimeUnit.SECONDS.sleep(3 );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("f1()执行...");
    }

    public static synchronized void f2(){
        System.out.println("f2()执行...");
    }
}

/*
Synchronized修饰static方法时,锁的对象是该类的classduix(本例中为ThreadTest.class)
本例中f1()、f2()方法被不同对象调用,但是由于锁的是同一个类class,谁先调用,谁先执行
延时3s
f1()执行...
延时1s
f2()执行...
*/

3.同一对象调用一个static同步方法和一个普通同步方法执行顺序

public static void main(String[] args) {

    ThreadTest threadTest1 = new ThreadTest();

    new Thread(()->{
        threadTest1.f1();
    }).start();

    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(()->{
        threadTest1.f2();
    }).start();
}
class ThreadTest{

    public static synchronized void f1(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("f1()执行...");
    }

    public synchronized void f2(){
        System.out.println("f2()执行...");
    }
}

/*
两个方法锁的对象不同,延时后f2()先执行
f2()执行...
f1()执行...
*/

卖票:

class Sell implements Runnable{

    private static int total = 50;

    private Boolean loop = true;

    @Override
    public void run() {
        System.out.println("窗口" + Thread.currentThread().getName() +"开始售票!");
        while(loop)
        {
            sellTicket();
        }
    }

    /**
     * synchronized加在方法上,变成同步方法,默认锁为 --->  this对象互斥锁
     */
    private synchronized void sellTicket() {
//        synchronized(this) { //synchronized加在代码块上,变成同步代码块,同一时刻只能有一个线程运行代码块
            if (total <= 0) {
                System.out.println("票已售完!!!");
                loop = false;
                return;
            }
            System.out.println("窗口" + Thread.currentThread().getName() + "卖掉一张票,剩余票数: " + (--total));
            try {
                //买完一张票休息一秒
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
//        }
    }
}
public static void main(String[] args) {
    Sell sell = new Sell();
    Thread A = new Thread(sell, "A");
    Thread B = new Thread(sell, "B");
    Thread C = new Thread(sell, "C");
    A.start();
    B.start();
    C.start();
}

/*
窗口B开始售票!
窗口C开始售票!
窗口A开始售票!
窗口B卖掉一张票,剩余票数: 49
窗口B卖掉一张票,剩余票数: 48
窗口B卖掉一张票,剩余票数: 47
窗口B卖掉一张票,剩余票数: 46
窗口B卖掉一张票,剩余票数: 45
窗口B卖掉一张票,剩余票数: 44
窗口B卖掉一张票,剩余票数: 43
窗口B卖掉一张票,剩余票数: 42
窗口B卖掉一张票,剩余票数: 41
窗口B卖掉一张票,剩余票数: 40
窗口B卖掉一张票,剩余票数: 39
窗口B卖掉一张票,剩余票数: 38
窗口B卖掉一张票,剩余票数: 37
窗口B卖掉一张票,剩余票数: 36
窗口B卖掉一张票,剩余票数: 35
窗口B卖掉一张票,剩余票数: 34
窗口B卖掉一张票,剩余票数: 33
窗口B卖掉一张票,剩余票数: 32
窗口B卖掉一张票,剩余票数: 31
窗口B卖掉一张票,剩余票数: 30
窗口B卖掉一张票,剩余票数: 29
窗口B卖掉一张票,剩余票数: 28
窗口B卖掉一张票,剩余票数: 27
窗口A卖掉一张票,剩余票数: 26
窗口A卖掉一张票,剩余票数: 25
窗口A卖掉一张票,剩余票数: 24
窗口A卖掉一张票,剩余票数: 23
窗口A卖掉一张票,剩余票数: 22
窗口A卖掉一张票,剩余票数: 21
窗口A卖掉一张票,剩余票数: 20
窗口A卖掉一张票,剩余票数: 19
窗口A卖掉一张票,剩余票数: 18
窗口A卖掉一张票,剩余票数: 17
窗口A卖掉一张票,剩余票数: 16
窗口A卖掉一张票,剩余票数: 15
窗口A卖掉一张票,剩余票数: 14
窗口A卖掉一张票,剩余票数: 13
窗口A卖掉一张票,剩余票数: 12
窗口A卖掉一张票,剩余票数: 11
窗口A卖掉一张票,剩余票数: 10
窗口A卖掉一张票,剩余票数: 9
窗口A卖掉一张票,剩余票数: 8
窗口A卖掉一张票,剩余票数: 7
窗口A卖掉一张票,剩余票数: 6
窗口A卖掉一张票,剩余票数: 5
窗口A卖掉一张票,剩余票数: 4
窗口A卖掉一张票,剩余票数: 3
窗口A卖掉一张票,剩余票数: 2
窗口C卖掉一张票,剩余票数: 1
窗口C卖掉一张票,剩余票数: 0
票已售完!!!
票已售完!!!
票已售完!!!
*/
2.Lock

卖票:

class Sell1 {

    private int total;

    public Sell1(int total) {
        this.total = total;
    }

    private Lock lock = new ReentrantLock(true);

    public void sellTicket() {
        //上锁
        lock.lock();
        try {
            if (total > 0) {
                System.out.println("窗口" + Thread.currentThread().getName() + "卖掉一张票,剩余票数: " + (--total));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //解锁
            lock.unlock();
        }
    }
}
public static void main(String[] args) {

    Scanner scanner = new Scanner(System.in);
    System.out.println("请输入总票数:");
    int count = scanner.nextInt();
    Sell1 sell = new Sell1(count);
    new Thread(()->{
        for (int i = 0; i < count; i++) {
            sell.sellTicket();
        }
    }, "A").start();
    new Thread(()->{
        for (int i = 0; i < count; i++) {
            sell.sellTicket();
        }
    }, "B").start();
    new Thread(()->{
        for (int i = 0; i < count; i++) {
            sell.sellTicket();
        }
    }, "C").start();
}

/*
请输入总票数:
20
窗口A卖掉一张票,剩余票数: 19
窗口A卖掉一张票,剩余票数: 18
窗口A卖掉一张票,剩余票数: 17
窗口A卖掉一张票,剩余票数: 16
窗口B卖掉一张票,剩余票数: 15
窗口A卖掉一张票,剩余票数: 14
窗口B卖掉一张票,剩余票数: 13
窗口A卖掉一张票,剩余票数: 12
窗口C卖掉一张票,剩余票数: 11
窗口B卖掉一张票,剩余票数: 10
窗口A卖掉一张票,剩余票数: 9
窗口C卖掉一张票,剩余票数: 8
窗口B卖掉一张票,剩余票数: 7
窗口A卖掉一张票,剩余票数: 6
窗口C卖掉一张票,剩余票数: 5
窗口B卖掉一张票,剩余票数: 4
窗口A卖掉一张票,剩余票数: 3
窗口C卖掉一张票,剩余票数: 2
窗口B卖掉一张票,剩余票数: 1
窗口A卖掉一张票,剩余票数: 0
*/

SynchronizedLock区别:

  1. Synchronized是Java关键字,Lock是一个Java类
  2. Synchronized自动释放锁,Lock需要手动释放锁
  3. Synchronized适合锁少量的同步代码,Lock适合锁大量的同步代码

3.生产者消费者问题:

Synchronized实现:

class Resource{

    private int count = 0;

    public synchronized void product() throws InterruptedException {
        if(count > 0)
        {
            this.wait();
        }
        count++;
        System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个产品,count = " + count);
        this.notifyAll();
    }

    public synchronized void consume() throws InterruptedException {
        if(count < 1)
        {
            this.wait();
        }
        count--;
        System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个产品,count = " + count);
        this.notifyAll();
    }
}
public static void main(String[] args) {

    Resource resource = new Resource();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.product();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"A").start();
    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.consume();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"B").start();
}

/*
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
*/

多个生产者消费者线程出现虚假唤醒问题:

    Resource resource = new Resource();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.product();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"A").start();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.consume();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"B").start();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.product();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"C").start();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.consume();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"D").start();
}

/*
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者A生产了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者C生产了一个产品,count = 1
生产者A生产了一个产品,count = 2
生产者C生产了一个产品,count = 3
消费者B消费了一个产品,count = 2
消费者B消费了一个产品,count = 1
消费者B消费了一个产品,count = 0
生产者C生产了一个产品,count = 1
生产者A生产了一个产品,count = 2
生产者C生产了一个产品,count = 3
消费者B消费了一个产品,count = 2
消费者D消费了一个产品,count = 1
消费者D消费了一个产品,count = 0
生产者C生产了一个产品,count = 1
生产者A生产了一个产品,count = 2
生产者C生产了一个产品,count = 3
消费者D消费了一个产品,count = 2
消费者D消费了一个产品,count = 1
生产者C生产了一个产品,count = 2
生产者A生产了一个产品,count = 3
生产者C生产了一个产品,count = 4
消费者D消费了一个产品,count = 3
消费者D消费了一个产品,count = 2
消费者D消费了一个产品,count = 1
消费者D消费了一个产品,count = 0
生产者C生产了一个产品,count = 1
消费者D消费了一个产品,count = 0
生产者C生产了一个产品,count = 1
消费者D消费了一个产品,count = 0
*/

this.wait()方法应放在while循环中判断

class Resource{

    private int count = 0;

    public synchronized void product() throws InterruptedException {
        while(count > 0)
        {
            this.wait();
        }
        count++;
        System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个产品,count = " + count);
        this.notifyAll();
    }

    public synchronized void consume() throws InterruptedException {
        while(count < 1)
        {
            this.wait();
        }
        count--;
        System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个产品,count = " + count);
        this.notifyAll();
    }
}

Lock实现:

class Resource1{

    private int count = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void product() throws InterruptedException {

        lock.lock();

        try {
            while(count > 0)
            {
                condition.await();
            }
            count++;
            System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个产品,count = " + count);

            condition.signalAll();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        } finally {
            lock.unlock();
        }
    }

    public void consume() throws InterruptedException {

        lock.lock();

        try {
            while(count < 1)
            {
                condition.await();
            }
            count--;
            System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个产品,count = " + count);

            condition.signalAll();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {

    Resource1 resource = new Resource1();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.product();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"A").start();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.consume();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"B").start();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.product();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"C").start();

    new Thread(()->{
        try {
            for (int i = 0; i < 10; i++) {
                resource.consume();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"D").start();
}
六、集合类并发问题 1.List
public static void main(String[] args) {

    ArrayList<String> list = new ArrayList<>();
    for (int i = 1; i <= 10; i++) {
        new Thread(()->{
            list.add(UUID.randomUUID().toString().substring(0, 8));
            System.out.println(list);
        }, String.valueOf(i)).start();
    }
}
/*
ConcurrentModificationException 集合类并发不安全(并发修改异常)
*/

解决方法:

  1. 使用Collections.synchronizedList(),将List方法上添加Sychronized关键字变成同步方法
List<String> list = Collections.synchronizedList(new ArrayList<>());

​ 2.使用CopyOnWriteArrayList,当 List 需要被修改的时候,并不直接修改原有数组对象,而是对原有数据进行一次拷贝,将修改的内容写入副本中。写完之后,再将修改完的副本替换成原来的数组,这样就可以保证写 *** 作不会影响读 *** 作了。

List<String> list = new CopyOnWriteArrayList<>();
//源码
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    //上锁
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //拷贝原数组,增加一个元素
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //添加新元素
        newElements[len] = e;
        //将副本替换成原来的数组
        setArray(newElements);
        return true;
    } finally {
        //解锁
        lock.unlock();
    }
}
2.Set
public static void main(String[] args) {

    HashSet<String> set = new HashSet<>();

    for (int i = 0; i < 20; i++) {
        new Thread(()->{
            set.add(UUID.randomUUID().toString().substring(0, 8));
            System.out.println(set);
        }, String.valueOf(i)).start();
    }
}
/*
ConcurrentModificationException 集合类并发不安全(并发修改异常)
*/

解决方法:

1.使用Collections.synchronizedSet(),将Set方法上添加Sychronized关键字变成同步方法

Set<String> set = Collections.synchronizedSet(new HashSet<>());

2.使用CopyOnWriteArraySet

Set<String> set = new CopyOnWriteArraySet<>();
3.Map
public static void main(String[] args) {

    Map<String, String> map = new ConcurrentHashMap<>();

    for (int i = 0; i < 20; i++) {
        final String key = String.valueOf(i);
        new Thread(()->{
            map.put(key, UUID.randomUUID().toString().substring(0, 8));
            System.out.println(map);
        }, String.valueOf(i)).start();
    }
}
/*
ConcurrentModificationException 集合类并发不安全(并发修改异常)
*/

解决方法:

1.使用Collections.synchronizedMap(),将Map方法上添加Sychronized关键字变成同步方法

Map<String, String> map = Collections.synchronizedMap(new HashMap<>());

2.使用ConcurrentHashMap

Map<String, String> map = new ConcurrentHashMap<>();
七、Callable
//Callable接口
// V --> 返回值
@FunctionalInterface
public interface Callable<V> {
    
    V call() throws Exception;
}
//Runnabel子类 --> RunnableFuture实现类 ---> FutureTask(也是一个Runnable)
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
//通过Thread启动
public static void main(String[] args) throws ExecutionException, InterruptedException {

    FutureTask<String> futureTask = new FutureTask<String>(() -> {
        return "1111";
    });

    new Thread(futureTask).start();

    //获取返回值(可能会阻塞)
    String str = futureTask.get();

    System.out.println(str);
}

CallableRunnabel区别:

  1. Callable执行call()方法,Runnable执行run()方法
  2. Callable执行后有返回值(可通过FutureTask获取),而Runnable没有返回值
  3. Callable可以抛出异常,Runnable不行
八、常用辅助类 1.CountDownLatch

CountDownLatch: 倒计时计数器,允许一个或多个线程等待直到其他线程执行 *** 作完成。

public static void main(String[] args) {

    //倒数次数
    int count = 5;
    CountDownLatch countDownLatch = new CountDownLatch(count);

    for (int i = 0; i < count; i++) {
        new Thread(()->{
            System.out.println(Thread.currentThread().getName() + "执行...");
            //每当一个线程执行完毕,计数器减一
            countDownLatch.countDown();
        }).start();
    }

    //当所有线程执行完后输出结束语句,已经执行完的线程等待计数器归零被唤醒
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("结束!");
}

/*
Thread-0执行...
Thread-3执行...
Thread-2执行...
Thread-1执行...
Thread-4执行...
结束!
*/
2.CyclicBarrier

CyclicBarrier: 加法计数器,允许一组线程全部等待彼此达到共同屏障。

public static void main(String[] args) {

    int count = 5;
    //CyclicBarrier(int parties, Runnable barrierAction) 
    //加法计数器,直到计数器到达count后执行Runnable线程
    CyclicBarrier cyclicBarrier = new CyclicBarrier(count, () -> {
        System.out.println("结束!");
    });
    for (int i = 1; i <= count; i++) {

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "执行...");
            //等待计数器到达5
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }, String.valueOf(i)).start();
    }
}
/*
1执行...
5执行...
3执行...
4执行...
2执行...
结束!
*/
3.Semaphore

Semaphore: 信号量,信号量维持一组许可证

public static void main(String[] args) {

    //两个许可证
    Semaphore semaphore = new Semaphore(2);

    //10个线程每次执行2个(2个线程拿到许可证,执行完后释放)
    for (int i = 1; i <= 10; i++) {

        new Thread(() -> {
            try {
                //线程被阻塞直到获取许可证
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "执行...");

                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //释放许可证
                semaphore.release();
            }
        }, String.valueOf(i)).start();
    }
}

/*
1执行...
2执行...
----------
4执行...
3执行...
----------
5执行...
6执行...
----------
7执行...
8执行...
----------
9执行...
10执行...
*/
4.ReadWriteLock

ReadWriteLock: 读写锁,读读共享,读写互斥,写写互斥

class MyStorage{

    //读写锁
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public void read(){
        lock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "正在读取...");
            System.out.println(Thread.currentThread().getName() + "读取成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.readLock().unlock();
        }
    }

    public void write(){
        lock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "正在写入...");
            System.out.println(Thread.currentThread().getName() + "写入成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.writeLock().unlock();
        }
    }

}
public static void main(String[] args) {

    MyStorage myStorage = new MyStorage();

    for (int i = 1; i <= 5; i++) {
        new Thread(() -> {
            myStorage.read();
        }, "read" + i).start();
    }

    for (int i = 1; i <= 5; i++) {
        new Thread(() -> {
            myStorage.write();
        }, "write" + i).start();
    }
}

/*
read2正在读取...
read5正在读取...
read5读取成功!
read3正在读取...
read3读取成功!
read1正在读取...
read4正在读取...
read4读取成功!
read1读取成功!
read2读取成功!
write4正在写入...
write4写入成功!
write1正在写入...
write1写入成功!
write2正在写入...
write2写入成功!
write3正在写入...
write3写入成功!
write5正在写入...
write5写入成功!
*/
九、线程相关队列 1.BlockingQueue

BlockingQueue: 阻塞队列

方式抛出异常返回值阻塞等待超时等待
添加元素addofferputoffer(…)
移除元素removepolltakepoll(…)
查看队首元素elementpeek--
public static void main(String[] args) throws InterruptedException {

    //队列容量为3
    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

    //add ---> 队列满时抛出异常
    //System.out.println(blockingQueue.add("A"));
    //System.out.println(blockingQueue.add("B"));
    //System.out.println(blockingQueue.add("C"));
    //System.out.println(blockingQueue.add("D"));
    
    //查看队首元素 队列为空时抛出异常
    //System.out.println(blockingQueue.element());

    //remove ---> 队列空时抛出异常
    //System.out.println(blockingQueue.remove());
    //System.out.println(blockingQueue.remove());
    //System.out.println(blockingQueue.remove());
    //System.out.println(blockingQueue.remove());

    //---------------------------------------

    //offer ---> 队列满时返回false,加入元素失败
    //System.out.println(blockingQueue.offer("A"));
    //System.out.println(blockingQueue.offer("B"));
    //System.out.println(blockingQueue.offer("C"));
    //System.out.println(blockingQueue.offer("D"));
    //查看队首元素, 队列为空时返回null
    //System.out.println(blockingQueue.peek());

    //poll ---> 队列空时返回null
    //System.out.println(blockingQueue.poll());
    //System.out.println(blockingQueue.poll());
    //System.out.println(blockingQueue.poll());
    //System.out.println(blockingQueue.poll());

    //---------------------------------------

    //blockingQueue.put("A");
    //blockingQueue.put("B");
    //blockingQueue.put("C");
    //队列满时,一直等待
    //blockingQueue.put("D");

    //System.out.println(blockingQueue.take());
    //System.out.println(blockingQueue.take());
    //System.out.println(blockingQueue.take());
    //队列为空时,一直等待
    //System.out.println(blockingQueue.take());

    //---------------------------------------

    blockingQueue.offer("A", 1, TimeUnit.SECONDS);
    blockingQueue.offer("B", 1, TimeUnit.SECONDS);
    blockingQueue.offer("C", 1, TimeUnit.SECONDS);
    //队列满时,等待1s,超时退出
    //blockingQueue.offer("D", 1, TimeUnit.SECONDS);

    System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
    System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
    System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
    //队列空时,等待1s,超时返回null
    System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));

}
2.SynchronousQueue

SynchronousQueue: 同步队列,只能存放一个元素

public static void main(String[] args) {

    SynchronousQueue<String> queue = new SynchronousQueue<>();

    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + "添加元素" + 1);
            queue.put("1");
            System.out.println(Thread.currentThread().getName() + "添加元素" + 2);
            queue.put("2");
            System.out.println(Thread.currentThread().getName() + "添加元素" + 3);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();


    new Thread(() -> {
        String value = null;
        try {
            TimeUnit.SECONDS.sleep(1);
            value = queue.take();
            System.out.println(Thread.currentThread().getName() + "取出元素" + value);
            value = queue.take();
            System.out.println(Thread.currentThread().getName() + "取出元素" + value);
            value = queue.take();
            System.out.println(Thread.currentThread().getName() + "取出元素" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }).start();
}

/*
Thread-0添加元素1
Thread-1取出元素1
Thread-0添加元素2
Thread-1取出元素2
Thread-0添加元素3
Thread-1取出元素3
*/
十、线程池 1.通过Executors创建线程池(不推荐,可能会导致Out Of Memory)
public static void main(String[] args) {

    //创建单一线程池
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    try {
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "执行...");
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        executorService.shutdown();
    }
}

/*
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
pool-1-thread-1执行...
*/
public static void main(String[] args) {

    //创建固定大小线程池
    ExecutorService executorService = Executors.newFixedThreadPool(5);

    try {
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "执行...");
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        executorService.shutdown();
    }
}

/*
pool-2-thread-1执行...
pool-2-thread-5执行...
pool-2-thread-1执行...
pool-2-thread-4执行...
pool-2-thread-4执行...
pool-2-thread-4执行...
pool-2-thread-2执行...
pool-2-thread-3执行...
pool-2-thread-1执行...
pool-2-thread-5执行...
*/
public static void main(String[] args) {

    //创建大小可变的线程池(根据CPU情况)
    ExecutorService executorService = Executors.newCachedThreadPool();

    try {
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "执行...");
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        executorService.shutdown();
    }
}

/*
pool-3-thread-1执行...
pool-3-thread-5执行...
pool-3-thread-4执行...
pool-3-thread-6执行...
pool-3-thread-7执行...
pool-3-thread-3执行...
pool-3-thread-2执行...
pool-3-thread-9执行...
pool-3-thread-8执行...
pool-3-thread-10执行...
*/

源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
2.使用ThreadPool(推荐):
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
                          int maximumPoolSize, //最大线程池大小
                          long keepAliveTime, //存活时间
                          TimeUnit unit, //超时
                          BlockingQueue<Runnable> workQueue, //阻塞队列
                          ThreadFactory threadFactory, //线程工厂
                          RejectedExecutionHandler handler) //拒绝策略

RejectedExecutionHandler(四种拒绝策略):

public static void main(String[] args) {

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
                                        5,
                                        3,
                                        TimeUnit.SECONDS,
                                        new LinkedBlockingQueue<>(3),
                                        //使用默认的线程工厂
                                        Executors.defaultThreadFactory(),
                                                           
                                        //多余的线程不处理,抛出异常
                                        //new ThreadPoolExecutor.AbortPolicy());
                                                           
                                        //哪里来的去哪里,交给原来的线程代理
                                        //new ThreadPoolExecutor.CallerRunsPolicy());
                                                           
                                        //多余的线程不处理,直接丢弃,不会抛出异常
                                        //new ThreadPoolExecutor.DiscardPolicy());
                                                           
                                        //多余的线程尝试和最先运行的线程竞争,如果原来的线程运行完,则可以运行,不会抛出异常
                                        new ThreadPoolExecutor.DiscardOldestPolicy());

    try {
        for (int i = 1; i <= 9; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "执行...");
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        threadPool.shutdown();
    }
}

/*
AbortPolicy:
pool-1-thread-1执行...
pool-1-thread-5执行...
pool-1-thread-4执行...
pool-1-thread-3执行...
pool-1-thread-2执行...
pool-1-thread-1执行...
pool-1-thread-4执行...
pool-1-thread-5执行...
RejectedExecutionException
---------------------------
CallerRunsPolicy:
pool-1-thread-1执行...
main执行...
pool-1-thread-3执行...
pool-1-thread-2执行...
pool-1-thread-5执行...
pool-1-thread-3执行...
pool-1-thread-4执行...
pool-1-thread-1执行...
pool-1-thread-2执行...
---------------------------
DiscardPolicy:
pool-1-thread-1执行...
pool-1-thread-2执行...
pool-1-thread-1执行...
pool-1-thread-5执行...
pool-1-thread-4执行...
pool-1-thread-3执行...
pool-1-thread-1执行...
pool-1-thread-2执行...
---------------------------
DiscardOldestPolicy:
pool-1-thread-1执行...
pool-1-thread-5执行...
pool-1-thread-4执行...
pool-1-thread-4执行...
pool-1-thread-3执行...
pool-1-thread-2执行...
pool-1-thread-5执行...
pool-1-thread-1执行...
*/

设置线程池最大线程数

//CPU密集型 
//设置线程池最大线程数为计算机CPU核数
Runtime.getRuntime().availableProcessors()

//IO密集型
//设置线程池最大线程数大于程序中十分消耗IO的线程个数即可
十一、函数式接口(lambda表达式) 1.Function
//T ---> 参数类型  R ---> 返回值
@FunctionalInterface
public interface Function<T, R> {

    R apply(T t);
		
    ...
}
public static void main(String[] args) {

    Scanner scanner = new Scanner(System.in);

    Function<String, String> function = (str) -> {
        System.out.println("请输入一个字符串:");
        return scanner.nextLine();
    };

    System.out.println("你输入的是:" + function.apply("1"));
}

/*
请输入一个字符串:
dssad
你输入的是:dssad
*/
2.Predicate
// T ---> 参数类型 返回值为boolean
@FunctionalInterface
public interface Predicate<T> {

    boolean test(T t);
    
    ...
}
public static void main(String[] args) {

    Predicate<String> predicate = (str) -> {
        return str.isEmpty();
    };

    System.out.println(predicate.test(""));
}

//true
3.Consumer
//T ---> 参数类型 没有返回值
@FunctionalInterface
public interface Consumer<T> {

    void accept(T t);
    
    ...
}
public static void main(String[] args) {

    Consumer<String> consumer = (str) -> {
        System.out.println("str = " + str);
    };

    consumer.accept("asdasxax");
}

//str = asdasxax
4.Supplier
//T ---> 返回值类型 没有参数
@FunctionalInterface
public interface Supplier<T> {

    T get();
}
public static void main(String[] args) {

    Supplier<String> supplier = () -> {return "asd";};

    System.out.println(supplier.get());
}

//asd
十二、Forkjoin

并行执行任务的框架,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果

工作窃取:所有空闲线程尝试去执行其他线程未执行的子任务

计算1 + ··· + 1000000000:

//普通方式
public class Normal {

    private long start;

    private long end;

    public Normal(long start, long end) {
        this.start = start;
        this.end = end;
    }

    public long getStart() {
        return start;
    }

    public long getEnd() {
        return end;
    }

    public long compute(){

        long sum = 0;

        for (long i = start; i <= end; i++) {
            sum += i;
        }

        return sum;
    }
}
//ForkJoin方式:
//1.继承ForkJoinTask的子类:
//RecursiveAction: 用于没有返回结果的任务。
//RecursiveTask: 用于有返回结果的任务。
public class ForkJoin_ extends RecursiveTask<Long> {

    private long start;

    private long end;

    private long temp = 10_000L;

    public ForkJoin_(long start, long end) {
        this.start = start;
        this.end = end;
    }

    public long getStart() {
        return start;
    }

    public long getEnd() {
        return end;
    }

    @Override
    protected Long compute() {

        long sum = 0;

        if ((end - start) < temp) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {

            long middle = (start + end) / 2;

            //按平均值划分成小任务
            ForkJoin_ task1 = new ForkJoin_(start, middle);

            //计算小任务
            task1.fork();

            //按平均值划分成小任务
            ForkJoin_ task2 = new ForkJoin_(middle + 1, end);

            //计算小任务
            task2.fork();

            //返回计算结果
            sum = task1.join() + task2.join();

            return sum;
        }
    }
}
//测试
public static void main(String[] args) {

    long start = 0L;

    long end = 1_000_000_000L;

    Normal normal = new Normal(start, end);

    long t1 = System.currentTimeMillis();

    System.out.println(normal.compute());

    long t2 = System.currentTimeMillis();

    System.out.println("normal耗时:" + (t2 - t1));

    System.out.println("=======================================");

    t1 = System.currentTimeMillis();

    ForkJoinPool forkJoinPool = new ForkJoinPool();

    ForkJoinTask<Long> task = new ForkJoin_(start, end);

    ForkJoinTask<Long> submit = forkJoinPool.submit(task);

    try {
        System.out.println(submit.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    t2 = System.currentTimeMillis();

    System.out.println("forkJoin耗时:" + (t2 - t1));

    System.out.println("=======================================");

    t1 = System.currentTimeMillis();

    System.out.println(LongStream.rangeClosed(start, end).parallel().reduce(0, Long::sum));

    t2 = System.currentTimeMillis();

    System.out.println("stream耗时:" + (t2 - t1));

}

/*
500000000500000000
normal耗时:570ms
=======================================
500000000500000000
forkJoin耗时:206ms
=======================================
500000000500000000
stream耗时:165ms
*/
十三、异步回调 1.CompletableFuture.runAsync(Runnable):
public static void main(String[] args) throws ExecutionException, InterruptedException {

    //没有返回值的异步回调(程序不会阻塞而是继续向下执行)
    CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName());
    });

    System.out.println("--------------");

    //阻塞等待异步结果
    completableFuture.get();
}

/*
--------------
ForkJoinPool.commonPool-worker-1
*/
2.CompletableFuture.supplyAsync(Supplier):
public static void main(String[] args) throws ExecutionException, InterruptedException {
    //有返回值的异步回调
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //制造异常异步执行失败
        //int i = 1 / 0;

        System.out.println(Thread.currentThread().getName());

        return "success";
    });
	
    //whenComplete(BiConsumer)  执行成功回调
    //exceptionally(Function)   执行失败回调
    System.out.println(completableFuture.whenComplete((t, u) -> {
        //返回值
        System.out.println("t = " + t);
        //错误信息
        System.out.println("u = " + u);
    }).exceptionally((e) -> {
        System.out.println(e.getMessage());
        return "fail";
    }).get());
}

/*
执行成功:
ForkJoinPool.commonPool-worker-1
t = success
u = null
success
------------------------
执行失败:
t = null
u = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
fail
*/
十四、JMM

JMM:Java内存模型

规定:

  1. 线程解锁前,必须把共享变量立刻刷新到主存
  2. 线程加锁前,必须将主存中的最新值读取到工作内存
  3. 加锁和解锁是同一把锁


(三组 *** 作成对出现)

public class Volatile_ {

    //输出sum = 1 但是线程不会退出 sum不可见
    //private static int num = 0;
    private volatile static int num = 0; //volatile保证可见性

    public static void main(String[] args) {

        new Thread(() -> {
            while(num == 0)
            {

            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        num = 1;
        System.out.println("num = " + num);
    }
}

十五、Volatile

Volatile:轻量级同步机制

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排
public class Volatile01_ {

    private volatile static int num = 0;

    //volatile不保证原子性
    //最后输出num可能小于20000,线程同时 *** 作
    public /*synchronized*/ static void add(){
        num++;
    }

    public static void main(String[] args) {

        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    add();
                }
            }, String.valueOf(i)).start();
        }

        while(Thread.activeCount() > 2) //main gc 线程
        {
            Thread.yield();
        }

        System.out.println("num = " + num);
    }
}

通过反编译发现:

使用原子类AtomicInteger:

public class Volatile01_ {

    private volatile static AtomicInteger num = new AtomicInteger();

    public /*synchronized*/ static void add(){
        num.getAndIncrement(); //num++
    }

    public static void main(String[] args) {

        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    add();
                }
            }, String.valueOf(i)).start();
        }

        while(Thread.activeCount() > 2) //main gc 线程
        {
            Thread.yield();
        }

        System.out.println("num = " + num);
    }
}

//20000

指令重排: 是对指令的优化,程序编写的顺序和实际上执行的顺序不一致。程序有万分之一的概率会产生指令重排。在多线程执行相同代码的前提下,产生指令重排,就会导致多个线程获取到了不同的结果。因此在多线程下要禁止指令重排。

x,y,a,b默认值为0

预期结果: x = 0,y = 0

线程1线程2
x = ay = b
b = 1a = 2

如果发生指令重排,结果变成: x = 2,y = 1

线程1线程2
b = 1a = 2
x = ay = b

使用volatile可以禁止指令重排(通过在 *** 作前后添加内存屏障)

Java屏障类型:

1.LoadLoad Barriers

两个线程同时从主存中加载数据到工作内存(Load) *** 作互斥

2.StoreStore Barriers

两个线程同时将工作内存中的数据刷新到主存(Store) *** 作互斥

3.LoadStore Barriers

当一个线程从主存中加载数据到工作内存(Load),同时另一个线程将工作内存中的数据刷新到主存(Store)时,要先保证先Load成功,Store才开始

4.StoreLoad Barriers(全能屏障)

当一个线程将工作内存中的数据刷新到主存(Store),同时另一个线程从主存中加载数据到工作内存(Load)时,要先保证Store已经成功,并且数据所有人可见,Load才开始

十六、CAS

CAS(compareAndSet): CPU的并发原语

public static void main(String[] args) {

    AtomicInteger atomicInteger = new AtomicInteger(1);

    System.out.println(atomicInteger.compareAndSet(1, 2));
    System.out.println(atomicInteger.get());
    System.out.println(atomicInteger.compareAndSet(1, 2));
    System.out.println(atomicInteger.get());
}

/*
true
2
false
2
*/

源码分析:

//源码 如果和预期结果(expect)一样则进行更新(update)
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

//atomicInteger中字段 Unsafe类对象(包含一系列native方法,底层使用C/C++) ---> 对内存进行 *** 作
private static final Unsafe unsafe = Unsafe.getUnsafe();

//atomicInteger中方法 原子性加一 *** 作
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

-----------------------------------------------------------------------------------

//Unsafe中方法
//var1 = this , var2 = valueOffset , var4 = 1 
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    //自旋锁 直到内存中该值加一为止
    do {
        //获取当前对象在内存中的值
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

//Unsafe中方法
public native int getIntVolatile(Object var1, long var2);

//Unsafe中方法
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

CAS无法解决**ABA**问题

**ABA**问题: 一个线程修改共享数据后又将该数据改回原值,另一个线程无法发现共享数据被修改过

public static void main(String[] args) {

    AtomicInteger atomicInteger = new AtomicInteger(1);

    new Thread(() -> {
        System.out.println("A:" + atomicInteger.compareAndSet(1, 2));
        System.out.println("A:" + atomicInteger.get());
        System.out.println("A:" + atomicInteger.compareAndSet(2, 1));
        System.out.println("A:" + atomicInteger.get());
    },"A").start();

    new Thread(() -> {

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //无法发现线程A修改过数据(ABA问题)
        System.out.println("B:" + atomicInteger.compareAndSet(1, 2));
        System.out.println("B:" + atomicInteger.get());
    },"B").start();
}

/*
A:true
A:2
A:true
A:1
B:true
B:2
*/

解决方法:

使用**AtomicStampedReference**每次修改更新stamp(版本号/时间戳)

public static void main(String[] args) {

    AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(1, 1);

    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("A:stamp = " + reference.getStamp());

        System.out.println("A:" + reference.compareAndSet(1, 2, reference.getStamp(), reference.getStamp() + 1));

        System.out.println("A:stamp = " + reference.getStamp());

        System.out.println("A:" + reference.compareAndSet(2, 1, reference.getStamp(), reference.getStamp() + 1));

        System.out.println("A:stamp = " + reference.getStamp());

    }, "A").start();

    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("B:stamp = " + reference.getStamp());

        System.out.println("B:" + reference.compareAndSet(1, 3, reference.getStamp(), reference.getStamp() + 1));

        System.out.println("B:stamp = " + reference.getStamp());
    }, "B").start();
}

/*
A:stamp = 1
A:true
A:stamp = 2
A:true
A:stamp = 3
B:stamp = 3
B:true
B:stamp = 4
*/
十七、锁 1.公平锁、非公平锁

公平锁: 先来后到、不能插队

非公平锁: 可以插队

Synchronized非公平锁、Lock默认非公平锁

Lock lock = new ReentrantLock(); //非公平锁
Lock lock = new ReentrantLock(true); //公平锁

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
2.可重入锁 3.自旋锁

使用CAS *** 作自定义自旋锁:

public class SpinLock {

    private AtomicReference<Thread> atomicReference = new AtomicReference(null);

    //加锁
    public void lock(){

        Thread thread = Thread.currentThread();

        System.out.println(Thread.currentThread().getName() + " lock");

        while(!atomicReference.compareAndSet(null, thread)) {

        }
    }

    //解锁
    public void unlock(){
        Thread thread = Thread.currentThread();

        System.out.println(Thread.currentThread().getName() + " unlock");

        while(!atomicReference.compareAndSet(thread, null)) {

        }
    }
}
public static void main(String[] args) {

    SpinLock spinLock = new SpinLock();

    new Thread(() -> {
        spinLock.lock();
        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + "执行");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            spinLock.unlock();
        }
    }, "A").start();

    new Thread(() -> {
        spinLock.lock();
        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + "执行");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            spinLock.unlock();
        }
    }, "B").start();
}

/*
A lock
A执行
A执行
A执行
A执行
B lock
A执行
A unlock
B执行
B执行
B执行
B执行
B执行
B unlock
*/
4.死锁

两个线程试图获取对方的锁,相互等待造成死锁

class MyThread implements Runnable{

    private static Object lock1 = new Object();

    private static Object lock2 = new Object();

    private boolean flag;

    public MyThread(boolean flag) {
        this.flag = flag;
    }

    @Override
    public void run() {
        //互相等待获取对方的锁
        if(flag) {
            synchronized (lock1) {
                System.out.println(Thread.currentThread().getName() + "获取lock1...");
                synchronized (lock2) {
                    System.out.println(Thread.currentThread().getName() + "获取lock2...");
                }
            }
        }
        else
        {
            synchronized (lock2) {
                System.out.println(Thread.currentThread().getName() + "获取lock2...");
                synchronized (lock1) {
                    System.out.println(Thread.currentThread().getName() + "获取lock1...");
                }
            }
        }
    }
}
public static void main(String[] args) {

    new Thread(new MyThread(true), "A").start();
    new Thread(new MyThread(false), "B").start();
}

/*
A获取lock1...
B获取lock2...
......
*/

排查方式:

1.在终端中输入jsp -l命令,查看程序对应的进程号

2.使用jstack 进程号查看堆栈信息找到死锁

最后显示死锁信息:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/720833.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存