侧边栏壁纸
博主头像
王一川博主等级

努力成为一个不会前端的全栈工程师

  • 累计撰写 70 篇文章
  • 累计创建 20 个标签
  • 累计收到 40 条评论

目 录CONTENT

文章目录

JUC

王一川
2021-10-06 / 2 评论 / 0 点赞 / 1,300 阅读 / 15,677 字
温馨提示:
本文最后更新于 2022-11-17,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

所有代码以开源在 gitee:https://gitee.com/uhope/juc

一、多线程回顾

1.1 什么是进程?什么是线程?

进程:是程序的一次执行,是系统进行资源分配和调度的独立单位,每个进程都有自己的内存空间和系统资源。即:程序运行一次就起一个进程。

线程:在一个进程内可以执行多个任务,每个任务就是一个线程。即:一个进程至少有一个线程。

1.2 什么是并发?什么是并行?

并发:同一时刻多个线程访问同一个资源,如:12306抢票。

并行:多个线程共同执行互补影响,如:泡面,一边烧水一边撕调料包。

1.3 什么是同步?什么是异步?

背景:一个线程在执行某个请求的时候,该请求需要一段时间的处理才能返回结果

同步:该线程会一直等待下去,直到获得结果才离去,同时还会影响其他线程。即:你今天必须给我一个说法,否则我就不走了。

异步:不需要一直等下去,当系统返回结果通知该线程进行处理。即:前台点餐,后厨烧菜,顾客座位等候,菜烧好通知顾客取餐。

1.4 Java 和 线程有必然关系吗?

进程和线程是和操作系统有关系,java本身不能创建线程,通过本地方法调用。如:没有安装 jdk 的时候操作系统任然存在进程和线程

1.5 线程的状态

Thread.State 定义了六种线程状态

线程状态:

  • NEW:new Thread()
  • RUNNABLE:start()
  • BLOCKED
  • WATING: 一直死等,不见不散
  • TIMED_WATING:不要死等,过时不候
  • TERMINATED

1.6 什么是 JUC

java.util.concurrent在并发编程中使用得工具类,即:java并发包。包含三个部分

  1. java.util.concurrent
  2. java.util.concurrent.atomic
  3. java.util.concurrent.locks

二、Lock 接口

2.1 synchronized 回顾

实现一个卖票案例,三个售票员卖50张票;遵循高内聚、低耦合的编程思想,通过线程操作资源类。

编写资源类

package tech.kpretty.bean;

public class Ticket {
    private int ticketNum = 50;

    public synchronized void sale() {
        if (ticketNum > 0) {
            System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩" + (ticketNum--) + "张");
        }
    }
}

三个售票员即三个线程卖同一类型的50张票

package tech.kpretty;

import tech.kpretty.bean.Ticket;

public class SaleTicketDemo {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(() -> {for (int i = 0; i < 51; i++) ticket.sale();}, "售票员1").start();
        new Thread(() -> {for (int i = 0; i < 51; i++) ticket.sale();}, "售票员2").start();
        new Thread(() -> {for (int i = 0; i < 51; i++) ticket.sale();}, "售票员3").start();
    }
}

2.2 lock

Lock implementations provide more extensive locking operations than can be obtained using synchronized methods and statements. They allow more flexible structuring, may have quite different properties, and may support multiple associated Condition objects. (lock 实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象)

lock 本身是一个接口,其实现类有:ReentrantLock、ReentrantReadWriteLock.ReadLock、ReentrantReadWriteLock.WriteLock,即:可重入锁,读写锁。其使用方式官方 api 提供一种模板

class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
 }

通过 lock() 方法加锁,unlock() 方法解锁;官方推荐通过 try…finally 方式,在 try之前加锁,try中实现具体的业务逻辑,在finally中释放锁,下面通过 lock 方式改写卖票案例。

public void saleForLock() {
  // 加锁
  lock.lock();
  try {
    // 业务逻辑
    if (ticketNum > 0) {
      System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩" + (--ticketNum) + "张");
    }
  } finally {
    // 解锁
    lock.unlock();
  }
}

注:lock() 一定要写在 try 外面,防止被异常吞掉

2.3 synchronized VS lock

已经有了 synchronized 为什么还要加入 lock ?举个例子

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone {
    public synchronized void sendEmail() {
        System.out.println(Thread.currentThread().getName() + "--> 发送邮件...");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class SynchronizedVsLock {
    public static void main(String[] args) {
        Phone phone = new Phone();
        new Thread(phone::sendEmail, "t1").start();
        new Thread(phone::sendEmail, "t2").start();
    }
}

两个线程,都需要执行发送邮件的功能,该功能需要耗时 5s,假设 t1 抢到了锁开始执行业务逻辑,t2 就被挡在外面必须等待 t1 完成之后才能执行;也就是说只要一个线程获取了 synchronized,中途不可以取消,不可以释放。

现实中我们往往需要这样,当这个锁已经被人持有允许等待一个固定的时间,或者直接不执行,防止系统产生大规模的等待;即:小明出门买煎饼果子,此时煎饼果子排的对太长了,小明可以随时不排队去买其它的。 lock 便可以实现

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Phone {
    private final Lock lock = new ReentrantLock();

    public synchronized void sendEmail() {
        System.out.println(Thread.currentThread().getName() + "--> 发送邮件...");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void sendMSN() {
        // 尝试抢锁
        if (lock.tryLock()) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "--> 发送信息...");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println("超时...");
        }

    }
}

public class SynchronizedVsLock {
    public static void main(String[] args) {
        Phone phone = new Phone();
        new Thread(phone::sendMSN, "t1").start();
        new Thread(phone::sendMSN, "t2").start();
    }
}

此时 t2 准备执行发送信息时,发现锁已经被 t1 持有,直接走 else,即允许线程临时中断,临时退出;同时 tryLock() 有一个重载方法 tryLock(long time, TimeUnit unit),二者区别在于 tryLock() 只要锁被人持有直接走,tryLock(long time, TimeUnit unit) 允许等待一定时间,在这个时间内抢到锁执行,没有抢到锁走,即:过时不候,官方源码推荐最佳实践

Lock lock = ...;
if (lock.tryLock()) {
  try {
    // manipulate protected state
  } finally {
    lock.unlock();
  }
} else {
  // perform alternative actions
}

总结:

synchronized 是 java 内置的关键字,它提供了一种独占的加锁方式。synchronized 的获取和释放锁由 jvm 实现,用户不需要显示的释放锁,非常方便,然而 synchronized 也有一定的局限性:

  1. 当线程尝试获取锁的时候,如果获取不到锁会一直阻塞,这个阻塞的过程,用户无法控制
  2. 如果获取锁的线程进入休眠或者阻塞,除非当前线程异常,否则其他线程尝试获取锁必须一直等待

Lock 弥补了 synchronized 的局限,提供了更加细粒度的加锁功能;同时 Lock 是一个对象,将锁的获取释放交由用户。

2.4 虚假唤醒

2.4.1 生产者消费者问题

实现两个线程,一个生产者一个消费者,生产者生产一个商品,消费者消费一个商品,商品为 1 消费者消费,商品为 0 生产者生产

package tech.kpretty;

class Produce {
    // 商品个数
    private int num = 0;

    // 生产
    public synchronized void produce() {
        if (num != 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + "--> " + (++num));
        this.notifyAll();
    }

    // 消费
    public synchronized void consumer() {
        if (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + "--> " + (--num));
        this.notifyAll();
    }
}

public class ProducerAndConsumer {
    public static void main(String[] args) {
        Produce produce = new Produce();
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                produce.produce();
            }
        }, "生产者").start();

        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                produce.consumer();
            }
        }, "消费者").start();
    }
}

结果一切正常,1、0 交替出现

考虑:若四个线程呢?两个生产者两个消费者

public class ProducerAndConsumer {
    public static void main(String[] args) {
        Produce produce = new Produce();
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                produce.produce();
            }
        }, "生产者").start();

        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                produce.consumer();
            }
        }, "消费者").start();

        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                produce.produce();
            }
        }, "生产者").start();

        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                produce.consumer();
            }
        }, "消费者").start();
    }
}
点击查看
出现了数据异常现象

2.4.2 虚假唤醒

问题分析:假设线程AC为生产者,BD为消费者,在正常运行一段时间后,线程A进来,发现商品为1,则执行 wait(),释放锁程序停止在这里;锁被C抢到了,进来发现商品为1,继续执行wait(),释放锁程序停止在这里;锁被B抢到了,商品为1,则进行消费->0,唤醒所有等待的线程,根据时间片轮转的规则等待时间长的线程获取锁的概率高,此时锁被A抢到,A从上次睡眠的地方继续执行,生产一个商品->1,唤醒所有等待线程;此时锁被C抢到,C从上次睡眠的地方继续执行,生产一个商品->2,唤醒所有等待的线程,这个时候就回到程序最开始的地方,若持续进行上述操作则商品数量会一直增加,商品为负数同理。

原理解释:当线程从等待已发出信号的条件变量中唤醒,却发现它正在等待的条件未得到满足时,就会发生虚假唤醒。它之所以被称为虚假的,是因为线程似乎无缘无故地被唤醒了。但是虚假唤醒并不是无缘无故发生的:它们通常是因为在用信号通知条件变量和等待线程最终运行之间的这段时间内,另一个线程运行并更改了条件。线程之间存在竞争条件,典型的结果是,有时,在条件变量上唤醒的线程首先运行,赢得竞争,有时它第二次运行,输掉竞争。即AC等待过程中,B唤醒了A,A修改了AC等待的条件,同时A继续唤醒C,导致C在不能被唤醒的条件下被唤醒了,称为虚假唤醒。

问题解决: 用 while 替换 if,即在被唤醒时再次判断是否达到此次唤醒条件,若被虚假唤醒则继续wait

class Produce {
    // 商品个数
    private int num = 0;

    // 生产
    public synchronized void produce() {
        while (num != 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + "--> " + (++num));
        this.notifyAll();
    }

    // 消费
    public synchronized void consumer() {
        while (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + "--> " + (--num));
        this.notifyAll();
    }
}

2.4.3 lock

使用 lock 替换 synchronized,因为 wait、notifAll 只能在 synchronized 中使用,因此 lock 提供相似的方法,通过 Condition 对象的 await(wait)、singal(notify)、singalAll(notifyAll)

class Produce {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    // 商品个数
    private int num = 0;

    // 生产
    public void produce() {
        lock.lock();
        try {
            while (num != 0) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "--> " + (++num));
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public void consumer() {
        lock.lock();
        try {
            while (num == 0) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "--> " + (--num));
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

2.5 线程间精准通信

从上面虚假唤醒的案例看不出 synchronized 和 lock 的区别,无非就是换了几个关键字,提供线程可中断可退出的功能;那 lock 还提供什么更加神奇的功能,使得 Lock 能以接口的形式被添加的 jdk1.5 中。

仔细分析虚假唤醒的原因其本质是因为 notifyAll,它会将所有等待的线程全部唤醒,使得本不该被唤醒的线程被错误的唤醒;而 notify 则随机唤醒一个,这会造成信号丢失本该被唤醒的线程没有被唤醒,因此必须使用 notifyAll 为了防止虚假唤醒将if换成while,线程被唤醒后再回去判断一下,但while和notifyAll的存在,增加了线程切换的次数和不必要的损耗;若是能够做到精准通信那么这个问题就不存在了。

在 synchronized 中,通过 wait 会使线程进入一个队列中等待,一把锁只维护一个队列,通过 notify/notifyAll 来唤醒队列一个或全部线程;而 condition 和 synchronized 一样维护一个队列,但是 condition 可以有多个,假如实现三个线程A、B、C,依次打印出ABCABCABC…,我们只需要创建三个 condition,保证 A 执行完通过 B 所在的 condition 来唤醒B,C同理,代码如下:

package tech.kpretty.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Func {
    private final Lock lock = new ReentrantLock();
    private final Condition condition1 = lock.newCondition();
    private final Condition condition2 = lock.newCondition();
    private final Condition condition3 = lock.newCondition();
    private int flag = 1;

    public void m1() {
        lock.lock();
        try {
            while (flag != 1) {
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName());
            flag = 2;
            condition2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m2() {
        lock.lock();
        try {
            while (flag != 2) {
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName());
            flag = 3;
            condition3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m3() {
        lock.lock();
        try {
            while (flag != 3) {
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName());
            flag = 1;
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class ConditionDemo {
    public static void main(String[] args) {
        Func func = new Func();
        new Thread(()-> {for (int i = 0; i < 50; i++) func.m1();},"A").start();
        new Thread(()-> {for (int i = 0; i < 50; i++) func.m2();},"B").start();
        new Thread(()-> {for (int i = 0; i < 50; i++) func.m3();},"C").start();
    }
}

分析如下:程序运行 flag 为 1,无论谁抢到锁,一定是 A 执行操作,BC会进入各自的队列中等待,A 将 flag 改成 2,同时通过 condition2.singal 只会唤醒 B,之后一直循环下去;flag 的作用是防止首次运行BC先抢到锁,或者 A 执行的时候,BC没有处于等待状态,防止此时C抢到锁执行操作。while 同样是为了方式虚假唤醒

注意:condition 本质上不能解决虚假唤醒,因为 singal,singalAll 功能不变,若 condition 维护的队列有多个线程同样会存在虚假唤醒的情况,只是上面案例每个condition最多只有一个线程而已

三、8锁案例

3.1 案例一

标准访问,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public synchronized void sendEmail() {
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短息");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        new Thread(phone::sendEmail, "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(phone::sendMSN, "B").start();
    }
}
点击查看 A-->发送邮件
B-->发送短信

3.2 案例二

邮件发送用时4s,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        new Thread(phone::sendEmail, "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(phone::sendMSN, "B").start();
    }
}
点击查看 A-->发送邮件
B-->发送短信

3.3 案例三

新增普通发QQ方法,先打印邮件还是QQ?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }

    public void sendQQ(){
        System.out.println(Thread.currentThread().getName() + "-->发送QQ");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        new Thread(phone::sendEmail, "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(phone::sendQQ, "B").start();
    }
}
点击查看 B-->发送QQ
A-->发送邮件

3.4 案例四

两部手机,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        Phone_ phone1 = new Phone_();
        new Thread(phone::sendEmail, "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(phone1::sendMSN, "B").start();
    }
}
点击查看 B-->发送短信
A-->发送邮件

3.5 案例五

一部手机,静态方法,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public static synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public static synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        //Phone_ phone1 = new Phone_();
        new Thread(Phone_::sendEmail, "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(Phone_::sendMSN, "B").start();
    }
}
点击查看 A-->发送邮件
B-->发送短信

3.6 案例六

两部手机,静态方法,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public static synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public static synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        Phone_ phone1 = new Phone_();
        new Thread(() -> phone.sendEmail(), "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(() -> phone1.sendMSN(), "B").start();
    }
}
点击查看 A-->发送邮件
B-->发送短信

3.7 案例七

一部手机,一个静态方法,一个普通方法,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public static synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        //Phone_ phone1 = new Phone_();
        new Thread(() -> phone.sendEmail(), "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(() -> phone.sendMSN(), "B").start();
    }
}
点击查看 A-->发送短信
B-->发送邮件

3.8 案例八

两部手机,一个静态方法,一个普通方法,先打印邮件还是短信?

package tech.kpretty.lock;

import java.util.concurrent.TimeUnit;

class Phone_ {
    public static synchronized void sendEmail() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-->发送邮件");
    }

    public synchronized void sendMSN() {
        System.out.println(Thread.currentThread().getName() + "-->发送短信");
    }

    public void sendQQ() {
        System.out.println(Thread.currentThread().getName() + "-->发送QQ");
    }
}

public class EightLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Phone_ phone = new Phone_();
        Phone_ phone1 = new Phone_();
        new Thread(() -> phone.sendEmail(), "A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(() -> phone1.sendMSN(), "B").start();
    }
}
点击查看 A-->发送短信
B-->发送邮件

3.9 分析

synchronized是实现同步的基础:Java中的每一个对象都可以作为锁,具体表现为以下3种形式。

  • 作用于实例方法,当前实例加锁,进入同步代码前要获得当前实例的锁;
  • 作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁;
  • 作用于代码块,对括号里配置的对象加锁。

案例1-2:

一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用其中的一个 synchronized 方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些 synchronized 方法,锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的 synchronized 方法。

案例3-4:

加个普通 QQ 方法后发现和同步锁无关

换成两个对象后,不是同一把锁了,情况立刻变化。

案例5-6:

都换成静态同步方法后,情况又变化

若是普通同步方法,锁是 this,具体的一部部手机, 所有的普通同步方法用的都是同一把锁——实例对象本身为对象锁。
若是静态同步方法,锁是 Class,唯一的一个模板为类锁。

案例7-8:

当一个线程试图访问同步代码时它首先必须得到锁,退出或抛出异常时必须释放锁。

所有的普通同步方法用的都是同一把锁——实例对象本身,就是new出来的具体实例对象本身
也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的普通同步方法因为跟该实例对象的普通同步方法用的是不同的锁,所以不用等待该实例对象已获取锁的普通,同步方法释放锁就可以获取他们自己的锁。

所有的静态同步方法用的也是同一把锁——类对象本身,就是我们说过的唯一模板Class,具体实例对象this和唯一模板Class,这两把锁是两个不同的对象,所以静态同步方法与普通同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。

四、线程安全

面试题:举例说明集合类是不安全的

package tech.kpretty.safe;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class NotSafeDemo {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 6));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}

三个线程写,三个线程读,结果不可预见,数据一致性不可保证,但程序不会错

再来一个,三十个线程写,三十个线程读

package tech.kpretty.safe;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class NotSafeDemo {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 6));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}

结果如下,报:并发修改异常:

Exception in thread "23" java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
	at java.util.ArrayList$Itr.next(ArrayList.java:861)
	at java.util.AbstractCollection.toString(AbstractCollection.java:461)
	at java.lang.String.valueOf(String.java:2994)
	at java.io.PrintStream.println(PrintStream.java:821)
	at tech.kpretty.safe.NotSafeDemo.lambda$main$0(NotSafeDemo.java:14)
	at java.lang.Thread.run(Thread.java:748)

4.1 解决方案

4.1.1 Vector

java 的集合类告诉我们,ArrayList 是线程不安全的,但 Vector 是线程安全的

package tech.kpretty.safe;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.Vector;

public class NotSafeDemo {
    public static void main(String[] args) {
        //List<String> list = new ArrayList<>();
        List<String> list1 = new Vector<>();

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list1.add(UUID.randomUUID().toString().substring(0, 6));
                System.out.println(list1);
            }, String.valueOf(i)).start();
        }
    }
}

没有问题,完美解决并发修改的问题,其实现原理是对其读写方法添加 synchronized,因此效率不高,同时 Vector 在 jdk1.0 就出现了,一个老大哥的存在。

4.1.2 集合工具类

集合工具类提供了对线程不安全的集合的解决方案,在线程不安全的集合外添加一层线程安全的机制

package tech.kpretty.safe;

import java.util.*;

public class NotSafeDemo {
    public static void main(String[] args) {
        //List<String> list = new ArrayList<>();
        //List<String> list1 = new Vector<>();
        List<String> list2 = Collections.synchronizedList(new ArrayList<>());

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list2.add(UUID.randomUUID().toString().substring(0, 6));
                System.out.println(list2);
            }, String.valueOf(i)).start();
        }
    }
}

4.1.3 CopyOnWrite

上面两种方案都不是现在最常用的方案,为保证数据的一致性写操作一定是要加锁的,为保证效率读操作不能加锁,看 juc 如何解决;

juc 提供了一个新的类实现了线程安全的集合类

package tech.kpretty.safe;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class NotSafeDemo {
    public static void main(String[] args) {
        //List<String> list = new ArrayList<>();
        //List<String> list1 = new Vector<>();
        //List<String> list2 = Collections.synchronizedList(new ArrayList<>());
        List<String> list3 = new CopyOnWriteArrayList<>();

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list3.add(UUID.randomUUID().toString().substring(0, 6));
                System.out.println(list3);
            }, String.valueOf(i)).start();
        }
    }
}

需要明白:对同一个资源类只要读写操作同时进行就会产生并发修改的问题,只对写操作加锁和不加锁没什么区别,单线程写的时候,其他线程读依然会产生问题;读写都加锁就是 Vector 的做法,效率低;因此解决方案就是写操作加锁,读不加锁,但写操作操作的资源类和读操作操作的资源类不是一个不就行了吗!这就是 CopyOnWrite 即:写时复制技术。

源码分析:

public boolean add(E e) {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    Object[] elements = getArray();
    int len = elements.length;
    Object[] newElements = Arrays.copyOf(elements, len + 1);
    newElements[len] = e;
    setArray(newElements);
    return true;
  } finally {
    lock.unlock();
  }
}

可以看出执行写操作的时候,首先获取一把锁,接着获取当前的数组和数据长度,接着复制一份原数组同时对其进行扩容操作容量 +1,**接着把数据写到新的数组中,**最后替换原数组,所有操作完成释放锁。

CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后新的容器Object[] newElements里添加元素,添加完元素之后,再将原容器的引用指向新的容器 setArray(newElements);。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

举个例子:小明参加一个会议,需要填写签到表

ArrayList 的做法:所有人随时可以拿过去签到,所有人随时可以拿过去查看,这就乱了。

Vector 的做法:签到、查看只能有一个人在进行,签到的时候只能一个人签,其他人不能签到不能查看,查看同理。

CopyOnWrite 的做法:签到表放墙上,小明需要签到的时候把墙上签到表复制一份在后面签到(此时别人不允许签到),过程中别人想查看签到表就看墙上的,小明完成签到后把手上的签到表粘到墙上覆盖原先的签到表。

4.1.4 读写锁

Lock 接口最后一个部分

在第四节:线程安全中谈到的案例,针对一个线程不安全的集合类,100个线程读,100个线程写,我们可以有下面的代码:

package tech.kpretty.lock;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Demo {
    private final List<String> list = new ArrayList<>();


    public void read() {
        System.out.println(Thread.currentThread().getName() + "正在读");
        System.out.println(list);
        System.out.println(Thread.currentThread().getName() + "读完了");
    }

    public void write() {
        System.out.println(Thread.currentThread().getName() + "正在写");
        list.add(UUID.randomUUID().toString().substring(0, 5));
        System.out.println(Thread.currentThread().getName() + "写完了");
    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        Demo demo = new Demo();
        for (int i = 0; i < 100; i++) {
            new Thread(demo::write, i + "").start();
        }

        for (int i = 0; i < 100; i++) {
            new Thread(demo::read, i + "").start();
        }
    }
}

结果显而易见,报并发修改异常,想要运成功行就必须要加锁,读写都需要加锁(读不加锁依然报并发修改异常),代码如下:

package tech.kpretty.lock;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Demo {
    private final List<String> list = new ArrayList<>();


    public synchronized void read() {
        System.out.println(Thread.currentThread().getName() + "正在读");
        System.out.println(list);
        System.out.println(Thread.currentThread().getName() + "读完了");
    }

    public synchronized void write() {
        System.out.println(Thread.currentThread().getName() + "正在写");
        list.add(UUID.randomUUID().toString().substring(0, 5));
        System.out.println(Thread.currentThread().getName() + "写完了");
    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        Demo demo = new Demo();
        for (int i = 0; i < 100; i++) {
            new Thread(demo::write, i + "").start();
        }

        for (int i = 0; i < 100; i++) {
            new Thread(demo::read, i + "").start();
        }
    }
}

其结果就是读也会被阻塞,这是我们不想要的,最好的模式是写加锁,读共享,这时候需要使用读写锁,代码如下:

package tech.kpretty.lock;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Demo {
    private final List<String> list = new ArrayList<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();


    public void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "正在读");
            System.out.println(list);
            System.out.println(Thread.currentThread().getName() + "读完了");
        } finally {
            readLock.unlock();
        }
    }

    public void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "正在写");
            list.add(UUID.randomUUID().toString().substring(0, 5));
            System.out.println(Thread.currentThread().getName() + "写完了");
        } finally {
            writeLock.unlock();
        }

    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        Demo demo = new Demo();
        for (int i = 0; i < 100; i++) {
            new Thread(demo::write, i + "").start();
        }

        for (int i = 0; i < 100; i++) {
            new Thread(demo::read, i + "").start();
        }
    }
}

读时看似加锁,实则可以并行处理。

五、CAS

从上述线程安全的集合类中的实现原理可以看出,所有的写操作一定要加锁,比如下面的方法一定是线程不安全的

public void m1(){
  i++;
}

但是因为锁机制存在如下问题:

  1. 多线程锁的竞争下,会造成过多线程的上下文切换和调度,引发性能问题。
  2. 一个线程抢到锁,其他线程只能被挂起。
  3. 若一个线程优先级高的线程等待线程优先级低的线程会产生优先级倒置,影响业务。

现在的问题是能不能在不加锁的前提下实现数据的原子性;synchronized 本质是一个独占锁,更抽象来说是一个悲观锁,另一种更加高效的方式是乐观锁,而乐观锁的实现机制就是 CAS

5.1 什么是 CAS

CAS:Compare And Swap,中文翻译比较并交换,是实现并发算法时常用到的一种技术,涉及三个操作数 —— 内存位置、预期原值和更新值,当且仅当内存位置的值与预期原值相等时,才会将内存位置的值修改成更新值,否则什么都不做,Java 实现这一功能的类在 java.util.concurrent.atomic 包中,以 AtomicInteger 为例:

package tech.kpretty.atomic;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(10);
        // CAS
        atomicInteger.compareAndSet(10,11);
        // 第一次比较交换
        System.out.println(atomicInteger.get());
        atomicInteger.compareAndSet(10,11);
        // 第二次比较交换
        System.out.println(atomicInteger.get());
    }
}

创建一个具有原子性的包装类整型 10(内存原值),compareAndSet(预期值,更新值),第一次比较交换,预期值与内存原值相同,内存原值更新为更新值11,第二次比较交换,预期值与内存原值不同什么都不做,因此两次都输出 11。

package tech.kpretty.atomic;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(10);
        for (int i = 0; i < 10; i++) {
            System.out.println(atomicInteger.incrementAndGet());
        }
    }
}

incrementAndGet(),实现线程安全的具有原子性的 i++ 操作,i–,++i,–i 同理。

原理图解:

5.2 CAS 原理

java实现CAS原理:Unsafe类和自旋锁

查看 compareAndSet 源码

public final boolean compareAndSet(int expect, int update) {
  return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);

查看 incrementAndGet 源码

public final int getAndIncrement() {
  return unsafe.getAndAddInt(this, valueOffset, 1);
}

public final int getAndAddInt(Object o, long offset, int delta) {
  int v;
  do {
    v = getIntVolatile(o, offset);
  } while (!compareAndSwapInt(o, offset, v, v + delta));
  return v;
}

可以看出核心的方法是一个来自 Unsafe 的本地方法 compareAndSwapInt

Unsafe类是什么?

Unsafe 类在 rt.jar 中的 sun.misc 包中,其内部所有方法都是本地方法,可以像 C 的指针一样直接操作内存;Unsafe 是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,Unsafe 更像是一个后门。

compareAndSwapInt如何实现

compareAndSwapInt 实现原理就是 CAS,该方法的实现位于 unsafe.cpp 中

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

它是一条 CPU 的并发原语。原语的执行必须是连续的,在执行过程中不允许被中断,也就是说 CAS 是一条 CPU 的原子指令,不会产生所谓的数据不一致问题;若 compareAndSwapInt 不是原子指令会出现什么情况?比较完成后准备进行交换操作时被另一个线程抢到CPU更新了数据,等该线程苏醒继续交换时数据其实已经改变了。

什么是自旋锁?

所谓的自旋锁就是 getAndAddInt 中的 do…while 部分,分析一下这段源码

public final int getAndAddInt(Object o, long offset, int delta) {
  int v;
  do {
    v = getIntVolatile(o, offset);
  } while (!compareAndSwapInt(o, offset, v, v + delta));
  return v;
}

o:当前对象,offset:内存中的偏移量,delta 需要变动的数量

首先通过 o 和 offset 获取当前对象的内存值 v 作为预期值,然后再次获取内存值与预期值 v 进行比较,相同像内存值修改为预期值+变动数量即:v+delta,从 unsafe.cpp 中可以看出若值相同返回 true,取反则循环结束;也就是说 CAS 成功循环结束,CAS 失败继续进行 CAS 操作直到成功,同时 compareAndSwapInt 底层从硬件层面加锁,即形成了一个循环也称:自旋锁。

5.3 CAS 缺点

循环时间长开销大

因为自旋锁的存在,CAS 失败会一直进行尝试,若 CAS 长时间不成功对 CPU 是一个很大的开销

只能保证一个共享变量的原子操作

CAS 无法保证多个共享变量的原子操作,此时只能加锁。java 也实现了引用类型的原子操作,即:原子引用 AtomicReference<T>

@AllArgsConstructor
class User {
    int age;
    String name;

    public static void main(String[] args){
        User user = new User(20,"张三");
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(user);
    }
}

会产生 ABA 问题

所谓的 ABA 问题就是:线程A准备进行 CAS 操作,假设内存原值为 5,对于 A 来说预期值为 5,修改值为 8;在线程A计算的过程中,线程B通过CAS将数据修改成10,之后线程C通过CAS将数据又修改回5,此时线程A完成操作执行CAS,发现预期值与内存原值一个,将数据修改为8;也就是说虽然结果一样,但此时的内存值已经不是原先的内存值了,CAS只关心前后数据是否一致,无法兼顾过程中的数据变换。

解决方案:对数据增加版本或者时间戳,因此上述的 A-B-A 就变成了 A1-B2-A3。java 自身也实现了解决ABA问题的类 AtomicStampedReference<T>

package tech.kpretty.atomic;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicRef {
    public static void main(String[] args) {
        AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 1);

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);// 睡一秒,让t1线程拿到最初的版本号
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ref.compareAndSet("A", "B", ref.getStamp(), ref.getStamp() + 1);
            ref.compareAndSet("B", "A", ref.getStamp(), ref.getStamp() + 1);
        }, "t2").start();
        new Thread(() -> {
            int stamp = ref.getStamp();//拿到最开始的版本号
            try {
                TimeUnit.SECONDS.sleep(3);// 睡3秒,让t2线程的ABA操作执行完
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(ref.compareAndSet("A", "C", stamp, stamp + 1));
        }, "t1").start();
    }
}

六、Callable

面试题:线程创建有几种方式?

截止目前,创建线程的方式有两种,继承 Thread 重写 run 方法和实现 Runnable 接口;现在增加一个 Callable 接口

package java.util.concurrent;

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

下面是 Runnable 接口

package java.lang;

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

二者的区别如下:

  1. 实现的方法不同,一个是 run,一个是 call
  2. 是否有返回值
  3. 是否可以抛异常

Callable 作为新事物的出现,必然是为了解决 Runnable 无法解决的问题,从上面可以看出来,Runnable 创建的线程在运行过程中相当于静默,没有返回值,无法抛异常无法获取更多的线程信息,如:这个线程是否结束了?运行的状况怎么样等?

6.1 如何创建线程

因为 Java 不能多继承,因此通过集成 Thread 类的方式将杜绝类的拓展,一般创建线程都是通过如下方式

new Thread(Runnable runnable,String threadName);

显然 Callable 无法直接替换 Runnable,因为线程作为jdk的出生而存在,不可能考虑到jdk1.5 需要拓展 Callable 接口,因此 jdk1.5 对此做了一个适配,常见的适配方式为:构造注入,接口关联

FutureTask(Callable<V> callable);

UML类图如下:

FutureTask 携带 Callable,实现 Runnable 完美解决 Callable 的适配

package tech.kpretty.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "hello callable";
            }
        });
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

如果仅仅是这样的,那也不能称之为 FutureTask(未来任务)

6.2 最佳实践

现在有一个任务,计算 1+2+3+(12345*678910)+4+5+6,假设中间乘法耗时 10s,抽象为:主线程需要执行比较耗时的操作时,但又不想阻塞主线程,可以另开一个线程把这些作业异步执行,当主线程将来需要时再来获取异步处理的结果,代码如下:

package tech.kpretty.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                TimeUnit.SECONDS.sleep(5);
                return "hello callable";
            }
        });
        new Thread(futureTask).start();
        System.out.println(Thread.currentThread().getName() + "主线程执行中...");
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + "主线程执行完毕...");
        System.out.println(futureTask.get() + "--- result");
    }
}

其结果为主线程跳过耗时的操作,交由 futureTask 线程来异步处理,主线程处理完毕后调用 get() 获取异步处理结果,之后的操作如何是与主线程合并结果还是什么看业务逻辑。上述代码主线程需要5s,同时异步线程也需要5s,主线程结束后获取异步线程的结果,看到的现象就是主线程结束后立马输出异步线程结果,通常单线程执行需要 10s,效率翻倍。

再看下面的代码

package tech.kpretty.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName() + "--hello callable";
            }
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable);
        new Thread(futureTask1, "A").start();
        new Thread(futureTask1, "B").start();
        new Thread(futureTask1, "C").start();
        System.out.println(Thread.currentThread().getName() + "主线程执行完毕...");
        System.out.println(futureTask1.get() + "--- result");
        System.out.println(futureTask1.get() + "--- result");
        System.out.println(futureTask1.get() + "--- result");
    }
}

执行需要多久?答案是:5s左右,也就是说 futureTask 会复用结果,输出的结果也只会是 A 线程的结果,BC线程复用 A 的结果

main主线程执行完毕...
A--hello callable--- result
A--hello callable--- result
A--hello callable--- result

再看下面代码

package tech.kpretty.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                TimeUnit.SECONDS.sleep(5);
                return Thread.currentThread().getName() + "--hello callable";
            }
        };
        FutureTask<String> futureTask = new FutureTask<>(callable);
        new Thread(futureTask, "A").start();
        futureTask.get();
        System.out.println(Thread.currentThread().getName() + "主线程执行完毕...");
    }
}

结果如下:

A--hello callable
main主线程执行完毕...

现象是主线程被阻塞了,get() 被调用说明当前十分需要异步线程的处理结果,那么主线程就会被挂起,等到get 获取到结果或者抛出异常,因此 get() 没有特殊业务需求一般放到最后。

七、JUC 辅助类

7.1 CountDownLatch

应用场景:秦王扫六合,一统华夏

解释:必须消灭六国后才能一统华夏

抽象:主线程必须等到六个其他线程全部完成后才能继续走

代码实现

package tech.kpretty.other;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 初始化计数为 6
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println("消灭了一个国家");
                // 必须放到线程最后,线程完成,计数 -1,计数为0 唤醒主线程
                countDownLatch.countDown();
            }).start();
        }
        // 当计数不等于0,主线程阻塞
        countDownLatch.await();
        System.out.println("秦一统天下...");
    }
}

原理:维护一个计数器,通过 await 阻塞一个线程,每当其他线程结束通过 countDown 使计数器 -1,当计数器为 0 时,唤醒阻塞的线程。

7.2 CyclicBarrier

应用场景:集齐七颗龙珠,就能召唤神龙

解释:想要召唤神龙,必须同时持有七颗龙珠

抽象:必须等到所有线程都到齐才能继续干活

代码实现

package tech.kpretty.other;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("召唤神龙");
        });

        for (int i = 0; i < 7; i++) {
            new Thread(() -> {
                System.out.println("集齐了一颗龙珠");
                try {
                    // 先到的线程阻塞在这里,等所有的线程都到才能继续往下走
                    cyclicBarrier.await();
                    System.out.println("神龙出现,冲呀!");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

原理:让一组线程到达一个同步点后,执行另一个线程任务

7.3 Semaphore

应用场景:停车场争抢车位

解释:停车场车位有限,先到先得,走一个进一个,限流操作

代码实现

package tech.kpretty.other;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 定义 10 信号量
        Semaphore semaphore = new Semaphore(10);

        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " -- 抢到了车位");
                    // 随机停0-4s
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName() + " -- 离开了车位");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }, "" + i).start();
        }
    }
}

acquire:获取一个信号量,release:释放一个信号量,当线程通过 acquire 没有获取到信号量需要一直等下去,要么有其他线程释放信号量要么线程超时,semaphore 一方面控制多个资源类的互斥使用,另一方面控制并发线程数量。

小总结:

  1. CountDownLatch:对计数做减法,减到 0 做特定的事
  2. CyclicBarrier:对计数做加法,加到特定的量做特定的事
  3. Semaphore:即做加法,又做减法
  4. Semaphore 的特殊使用:new Semaphore(1) 等效于 synchronized,acquire 相当于 wait,release 相当于 notify

八、阻塞队列

在上面生产者消费者案例中,我们通过 wait 和 notify 来控制生产者消费者线程的活动,现在考虑商品能不能交给系统来维护,即:我们不关系线程的唤醒与等待,两个线程在开发者角度来看一个线程生产商品,一个线程消费商品,商品满的时候生产者自动阻塞,消费者自动唤醒;商品为空时消费者自动阻塞,生产者自动唤醒,而阻塞队列就能做到这。

8.1 架构

首先,阻塞队列是一个队列,满足先进先出的特征,之所以叫阻塞队列是因为可以定义一个特定长的队列,多个线程操作这个队列有读有写,但是这个队列可以做到:队列满,阻塞写线程;队列空,阻塞读线程。

组织架构(部分)

常用类汇总

解释
ArrayBlockingQueue 数组构成的有界阻塞队列
LinkedBlockingQueue 链表构成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 延迟阻塞队列
SynchronousQueue 不存储元素的阻塞队列,即单个元素的队列
LinkedTransferQueue 链表构成的无界阻塞队列
LinkedBlockingDeque 链表构成的双向阻塞队列

其中:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 最为常用

8.2 API

阻塞队列的 api 分为很多种类,主要是针对队列满时写和队列空时读该做出什么样的回应,如:队列满时写时阻塞,队列空时读时报错等,下面方法来自父接口 BlockingQueue,因此所有落地的阻塞队列都可以调用。

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用
  1. 抛出异常
    1. 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full
    2. 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
  2. 特殊值
    1. 插入方法,成功ture失败false
    2. 移除方法,成功返回出队列的元素,队列里没有就返回null
  3. 阻塞
    1. 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出
    2. 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用
  4. 超时
    1. 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出
    2. 当阻塞队列空时,队列会阻塞消费者线程一定时间,超过限时后消费者线程会退出

九、线程池

9.1 池化技术

池化技术的主要作用就是减少大开销对象的创建,提高程序性能;特别是高并发情景下性能提高更加明显,使用池化技术缓存资源对象都有如下特点:

  1. 资源对象创建时间长,开销大
  2. 资源对象需要反复利用

一个标准的池化技术框架需要具备:租用资源对象、归还资源对象、清除过期资源对象;如:数据库连接池、JVM的常量池,以及下面所要谈的线程池等。

9.2 线程池

线程池做的工作就是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过最大数量,超出数量的线程排队等待,等待其他线程执行完毕,再从队列中去出任务来执行,主要特点为:线程复用、控制最大并发数、管理线程

  1. 降低资源消耗。通过重复利用已创建的线程降低线程的创建和销毁造成的资源消耗
  2. 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立刻执行
  3. 提高线程可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统稳定性,使用线程池统一分配、调优和监控

Java 的线程池是通过 Executor 框架实现的,架构如下:

Executors 作为线程池的工具类独立于该架构之外,通过该工具类快速创建 Java 给我们实现好的线程池实例

9.2.1 固定数量线程池

代码如下:

package tech.kpretty.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
    public static void main(String[] args) {
        // 固定数量的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        try {
            for (int i = 1; i <= 6; i++) {
                int finalI = i;
                fixedThreadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            // 关闭线程池
            fixedThreadPool.shutdown();
        }
    }
}

处理逻辑:

若 线程池大小 >= 任务数:一个线程处理一个任务,存在线程空闲的现象

若 线程池大小 < 任务数:部分线程处理多个任务

9.2.2 单个线程的线程池

代码如下:

package tech.kpretty.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
    public static void main(String[] args) {
        // 固定数量的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        // 单个线程池
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            for (int i = 1; i <= 4; i++) {
                int finalI = i;
                newSingleThreadExecutor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            // 关闭线程池
            newSingleThreadExecutor.shutdown();
        }
    }
}

一个线程处理所有任务,等价于 Executors.newFixedThreadPool(1)

9.2.3 可扩容线程池

代码如下:

package tech.kpretty.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
    public static void main(String[] args) {
        // 固定数量的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        // 单个线程池
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        // 可扩容线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        try {
            for (int i = 1; i <= 100; i++) {
                int finalI = i;
                cachedThreadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            // 关闭线程池
            cachedThreadPool.shutdown();
        }
    }
}

该线程池可以实现扩容,扩容上限为 Integer.MAX_VALUE,但并不是一个任务扩一个线程,其处理逻辑为:来一个任务首先查看有没有空闲的线程,若有则复用线程,若没有创建一个线程处理,因此上面代码 100 个任务实际运行线程池仅创建了 30 个左右的线程。

9.2.4 定时线程池

代码如下:

package tech.kpretty.pool;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo {
    public static void main(String[] args) {
        // 固定数量的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        // 单个线程池
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        // 可扩容线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        //
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
        try {
            for (int i = 1; i <= 10; i++) {
                int finalI = i;
                scheduledThreadPool.schedule(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                }, new Random().nextInt(5), TimeUnit.SECONDS);
            }
        } finally {
            // 关闭线程池
            scheduledThreadPool.shutdown();
        }
    }
}

支持定时、延迟、周期调用任务执行

每隔5s调用一次

package tech.kpretty.pool;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo {
    public static void main(String[] args) {
        // 固定数量的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        // 单个线程池
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        // 可扩容线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        //
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
        try {
            for (int i = 1; i <= 10; i++) {
                int finalI = i;
                scheduledThreadPool.scheduleAtFixedRate(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                }, new Random().nextInt(5), 10, TimeUnit.SECONDS);
            }
        } finally {
            // 关闭线程池
            //scheduledThreadPool.shutdown();
        }
    }
}

注:周期调用,一定不要关闭线程池,不然就没了

总结:

  1. newFixedThreadPool:执行长期稳定的任务
  2. newSingleThreadExecutor:指定多个短期异步任务
  3. newSingleThreadExecutor:一池一线程,一个任务一个任务执行
  4. newScheduledThreadPool:执行延迟、定时、周期任务

9.3 底层原理

上面那4个花里胡哨的线程池,那他们的底层到底什么?如何工作的呢?

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}
// -->
public ScheduledThreadPoolExecutor(int corePoolSize) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue());
}
// -->
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
       Executors.defaultThreadFactory(), defaultHandler);
}

发现四个线程池都是基于 ThreadPoolExecutor 实现的,只是实例化时参数不同,因此想要了解线程池是如何工作的,就必须了解 ThreadPoolExecutor 7个参数的含义

corePoolSize

线程池中常驻的核心线程数,可以理解为线程池维护的最小线程数

maximumPoolSize

线程池中能够容纳同时执行的最大线程数,此值必须大于0,可以理解为线程池维护的最大线程数

keepAliveTime

空闲线程存活的时间,涉及缩容;如:当线程池线程数量大于 corePoolSize 时(发生了扩容),当线程空闲时间超过 keepAliveTime 时就会被销毁,直到线程数等于 corePoolSize

unit

keepAliveTime 的单位

workQueue

任务队列,存放被提交且尚未被执行的任务

ThreadFactory

创建线程的线程工厂,一般默认即可

RejectedExecutionHandler

拒绝策略,当线程池数量达到 maximumPoolSize,且 workQueue 满了,之后的任务会执行拒绝策略

线程池的工作原理如下:

  1. 创建线程池,开始等待请求
  2. 调用 execute 添加一个请求任务,如果正在运行的线程数小于 corePoolSize,马上创建线程去运行这个任务
  3. 在来一个任务,此时正在运行的线程数等于 corePoolSize,这个任务会被放到 workQueue 阻塞队列中,此时若有线程处理完会从阻塞队列中取任务
  4. 假设任务一直来,阻塞队列满了,这个时候会创建非核心线程来运行刚来的任务(新来的任务直接执行,并不是去执行等待很久的线程),此时若有线程处理完会从阻塞队列中取任务,若阻塞队列里的任务都被执行完了,线程池会根据设定的 keepAliveTime 来销毁线程直到线程数等于 corePoolSize
  5. 假设任务一直来,阻塞队列满了,同时正在运行的线程数等于 maximumPoolSize,线程池就会执行拒绝策略保护线程池,具体的拒绝策略见后面。

9.4 线程池的选择

上面说的 java 内置的四种线程池,那么我们实际工作中到底选择哪一个呢?

答案是一个都不选,看一段来自阿里巴巴java开发手册里的一段话

代码如下:

public LinkedBlockingQueue() {
  this(Integer.MAX_VALUE);
}

线程池的工作队列默认使用的阻塞队列长度是 Integer.MAX_VALUE,这就会产生一个严重的问题,根据上面线程池工作原理,只有当工作队列满的时候才会创建非核心线程去执行任务,该有多大的请求才会达到 Integer.MAX_VALUE 这个数量级;因此实际开发中必须使用 ThreadPoolExecutor 来创建,根据具体业务配置那 7 个参数

package tech.kpretty.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyPool {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
    }
}

那么这7个参数该如何合理配置:标准答案 —— 压测,但是也有一个经验值来配置线程池大小:

  1. 如果是 CPU 密集型任务:最大线程 = 核数 + 1
  2. 如果是 I/O 密集型任务:最大线程 = 核数 * 2

9.5 拒绝策略

9.5.1 AbortPolicy

java 线程池默认选择的拒绝策略,直接抛出 RejectedExecutionException 异常,阻止系统正常运行(大坑呀)

public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

测试代码

package tech.kpretty.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyPool {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        try {
            for (int i = 1; i <= 9; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

可以看出自定义线程池最大允许任务数(包括等待的任务)为 5+3 = 8,此时任务数 9,直接报错

9.5.2 CallerRunsPolicy

该策略既不会抛弃任务也不会抛异常,而是将任务退回给调用者,降低新任务的流量

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

测试代码

package tech.kpretty.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyPool {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        try {
            for (int i = 1; i <= 10; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

结果如下,超出的任务被 main 线程执行

9.5.3 DiscardOldestPolicy

抛弃等待时间最长的任务,然后把当前任务加入到队列中

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 弹出队列最后一个元素
                e.getQueue().poll();
                // 尝试执行当前任务
                e.execute(r);
            }
        }
    }

测试代码

package tech.kpretty.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyPool {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );
        try {
            for (int i = 1; i <= 100; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

9.5.4 DiscardPolicy

抛弃当前任务,不报错,就是什么都不做

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

测试代码

package tech.kpretty.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyPool {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );
        try {
            for (int i = 1; i <= 100; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

9.6 补充

验证当线程数达到 corePoolSize 且 workQueue 满时,在来的任务需要创建非核心线程来执行新来的任务,而非等待时间长的任务

代码如下:

package tech.kpretty.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyPool {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        try {
            for (int i = 1; i <= 8; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "处理任务:" + finalI);
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

可以看出不管怎么运行,3、4、5任务都是最后被执行的,即:1,2任务来开始执行;3,4,5 任务被放入 workQueue;6,7,8 任务来开始扩容;上面任务执行完去执行 3,4,5


线程池的 shutdown() 和 shutdownNow() 区别?

shutdown 会等待线程全部执行完后关闭线程池,shutdownNow 立刻关闭线程池返回未被执行的任务list


线程池提交任务的 execute() 和 submit() 区别?

execute() 没有返回值,submit() 可以有返回值

0

评论区