侧边栏壁纸
博主头像
王一川博主等级

努力成为一个不会前端的全栈工程师

  • 累计撰写 70 篇文章
  • 累计创建 20 个标签
  • 累计收到 39 条评论

目 录CONTENT

文章目录

JUC高级

王一川
2021-10-20 / 2 评论 / 0 点赞 / 995 阅读 / 25,496 字
温馨提示:
本文最后更新于 2022-06-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
  • 本部分的前置章节:juc初级,本章作为前置章节的进阶部分,需要有一定的并发编程基础或能够理解前置章节的内容。

一、juc 回顾

1.1 走马观花

首先膜拜大神,java 并发编程的祖师爷,感谢赏饭吃

总结 juc 初级的四句口诀

  1. 高内聚低耦合的前提下,使用封装思想;线程操作资源类,资源类对外暴露操作接口
  2. 判断、干活、通知
  3. 防止虚假唤醒,wait 方法要注意使用 while 判断
  4. 注意标记位,实现精准通信

多线程为什么这么重要:

  1. 硬件方面:摩尔定律的失效,从 2003 年开始 CPU 的主频已经不再翻倍,而是采用多核的方式;在主频不变核数增加的情况下,想要程序更快就有使用并发编程
  2. 软件方面:愈发复杂的业务、高并发的系统、异步+回调的生产需求

1.2 从start一个线程说起

提问:如何启动一个线程?

答案:调用 start() 方法

是的,java 启动一个线程就是调用它的 start 方法,该方法调用 JNI 的 start0() 方法

public synchronized void start() {
  if (threadStatus != 0)
    throw new IllegalThreadStateException();
  group.add(this);

  boolean started = false;
  try {
    start0();
    started = true;
  } finally {
    try {
      if (!started) {
        group.threadStartFailed(this);
      }
    } catch (Throwable ignore) {
      
    }
  }
}

private native void start0();

众所周知 java 源代码的源代码是 C++,若是 juc 的初级,只要说最终调用的是一个本地方法就结束了,但作为 juc 的高级,需要看一下底层源码。

openjdk源码网站:http://openjdk.java.net

建议下载到本地偷偷看,这里推荐一篇大神博客,这块非常详细:https://www.jianshu.com/p/3ce1b5e5a55e

1.3 线程分类

java的线程分为用户线程和守护线程,默认创建的线程为用户线程;线程的 daemon 属性为 true 表示是守护线程,为 false 表示是用户线程。

  • 用户线程:是系统的工作线程,它会完成这个程序需要完成的业务操作
  • 守护线程:是一种特殊的线程,在后台默默的完成一些系统性服务,如垃圾回收

因此 java 运行一个程序至少需要两个线程:一个 main 线程为用户线程,一个 gc 线程为守护线程

看下面的代码:

package tech.kpretty.advanced.basic;

import java.util.concurrent.TimeUnit;

public class BasicUse {
    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "--" + Thread.currentThread().isDaemon());
            while (true) {
            }
        }, "A");

        thread.start();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread().getName() + "--" + Thread.currentThread().isDaemon());
    }
}

可以看出线程 A 和 main 线程都是用户线程,且线程 A 死循环,因此该程序打印完两句话后不会停止。再看下面的程序

package tech.kpretty.advanced.basic;

import java.util.concurrent.TimeUnit;

public class BasicUse {
    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "--" + Thread.currentThread().isDaemon());
            while (true) {
            }
        }, "A");
        thread.setDaemon(true);
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread().getName() + "--" + Thread.currentThread().isDaemon());
    }
}

将线程 A 设置为守护线程,此时程序打印完 main 线程的信息后程序停止,说明:当用户线程全部执行完之后,守护线程无论是否结束系统都将自动退出,即守护线程无法单独存在

注:设置守护线程一定要在 start 之前进行

二、CompletableFuture

2.1 从 FutureTask 说起

作为另一种创建线程的方式,FutureTask 适配了 Thread 和 Callable,同时为 java 的异步调用提供了方便,见下面代码

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "异步做完了";
        });

        new Thread(futureTask).start();
        System.out.println("主线程开始做");
        TimeUnit.SECONDS.sleep(5);
        System.out.println("主线程做完了");
        System.out.println(futureTask.get());
    }
}

主线程启动一个异步线程来处理其他任务,自己继续往下走,当主线程处理完后通过 get 获取异步线程的处理结果,两者属于并行处理;从代码逻辑看主线程耗时 5s,异步线程还是 5s,因此输出结果应该是主线程和异步线程几乎同时打印,理论上 10s 的任务现在只需要 5s,看起来一切安好。

但是我们知道 get 方法一定要最后执行,因为 get 会触发阻塞,调用了 get 方法意味着我现在急需异步线程的处理结果,因此程序阻塞在这里等到异步线程处理完成后才能继续后面的逻辑,见下面这段致命的代码

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "异步做完了";
        });

        new Thread(futureTask).start();
        System.out.println("主线程开始做");
        // 这段代码是致命的
        System.out.println(futureTask.get());
        TimeUnit.SECONDS.sleep(5);
        System.out.println("主线程做完了");
    }
}

get 方法没有放在最后,那整个系统的高并发将在这里被破坏,这也是 FutureTask 的缺点;在高并发系统中我们需要克服阻塞,少加锁或者不加锁,那么克服阻塞的方式什么?

首先 FutureTask 为了克服 get 导致的阻塞,提供了一个 get 的重载方法,加入超时时间,允许等待一段时间,超过后抛出异常

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "异步做完了";
        });

        new Thread(futureTask).start();
        System.out.println("主线程开始做");
        // 这段代码是致命的
        //System.out.println(futureTask.get());
      	// 允许超时 2s,即阻塞 2s
        System.out.println(futureTask.get(2, TimeUnit.SECONDS));
        TimeUnit.SECONDS.sleep(5);
        System.out.println("主线程做完了");
    }
}

这样好吗?这样不好!还是会产生阻塞只不过相对减缓

克服阻塞的第二种方式乐观锁即CAS,我们知道在java的原子操作中为了不加锁引入了CAS并基于自旋锁的方式避免加锁;因此克服阻塞较好的方式是:轮训、自旋替代阻塞,见下面代码:

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "异步做完了";
        });

        new Thread(futureTask).start();
        System.out.println("主线程开始做");
        // 这段代码是致命的
        //System.out.println(futureTask.get());
        // 允许等待2s
        //System.out.println(futureTask.get(2, TimeUnit.SECONDS));
        // 自旋,轮序替代阻塞
        while (true) {
            if (futureTask.isDone()) {
                System.out.println(futureTask.get());
                break;
            } else {
                System.out.println("不要催!马上就好了");
            }
        }
        //TimeUnit.SECONDS.sleep(5);
        System.out.println("主线程做完了");
    }
}

这样好吗?看起来还不错,在异步任务没有结束的时候可以加入一些友好的提示信息,至少程序没有阻塞!!!但轮询的方式会消耗无谓的 CPU 资源,且不见得能及时得到计算结果,明面程序没有阻塞,但和阻塞没有太大的什么区别。

上述这些试图从体制内来弥补 FutureTask 的不足,但仅这么简单的需求弥补的方案都不尽人意,那么更复杂的需求呢?比如:

  1. FutureTask 任务完成后能不能主动的告诉我,即异步回调
  2. 多个异步任务相互依赖,我只要最终的结果
  3. 多个异步任务只要有一个结束就返回结果
  4. 多个异步任务全部完成后返回结果
  5. ...

这时候就要请出 FutureTask 的进化版本 CompletableFuture 了!!!

2.2 初步使用

CompletableFuture 提供了非常强大的 Future 的拓展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供转换和组合的方式。

获取 CompletableFuture 的方式通常有两种:runAsync 和 supplyAsync

supplyAsync(Supplier<U> supplier);
supplyAsync(Supplier<U> supplier,Executor executor);
runAsync(Runnable runnable);
runAsync(Runnable runnable,Executor executor);
  • 相同点:二者都有两个重载方法,可以额外传一个线程池,且另一个参数都是一个函数式接口
  • 不同点:Supplier 接口没有参数但有返回值,Runnable 没有参数没有返回值
  • 总结:通常使用 supplyAsync 更加灵活
package tech.kpretty.advanced.completablefuture;


import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "---" + Thread.currentThread().isDaemon());
            return 1;
        });
        System.out.println(Thread.currentThread().getName());
    }
}

输出结果为:

ForkJoinPool.commonPool-worker-1---true
main

从结果可以看出:

  1. CompletableFuture 异步任务一定是基于线程池执行,
  2. 默认情况 CompletableFuture 起的异步任务是守护线程

针对守护线程,见下面代码

package tech.kpretty.advanced.completablefuture;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
          // 休眠 1 s
            try { TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace();}
            System.out.println(Thread.currentThread().getName() + "---" + Thread.currentThread().isDaemon());
            return 1;
        });
        System.out.println(Thread.currentThread().getName());
    }
}

这个时候异步任务会在主线程结束后立刻被停止,若使用自己创建的线程池则异步任务的线程是用户线程,见下面代码

package tech.kpretty.advanced.completablefuture;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        CompletableFuture.supplyAsync(() -> {
            // 休眠 1 s
            try { TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace();}
            System.out.println(Thread.currentThread().getName() + "---" + Thread.currentThread().isDaemon());
            return 1;
        },threadPool);
        System.out.println(Thread.currentThread().getName());

        threadPool.shutdown();
    }
}

同时由于 CompletableFuture 继承了 Future,它同样有 get 方法,同样会阻塞;但是它的 get 方法会抛出异常,为此额外提供了一个不会抛异常的 join 方法。

异步回调

异于 FutureTask 的一种方式,异步调用任务,任务结束自动回调无需等待,不阻塞主线程,同时异步任务过程中的异常也可以捕获,不会影响系统的健壮性

package tech.kpretty.advanced.completablefuture;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(1);
            // 休眠 1 s
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "---" + Thread.currentThread().isDaemon());
            return 1;
        }, threadPool);

        // 异步回调
        completableFuture.whenComplete((f, e) -> {
            // 异常为空,执行正常逻辑
            if (e == null) {
                System.out.println(f);
            }
        }).exceptionally(throwable -> {
            // 异常捕获
            throwable.printStackTrace();
            return -1;
        });

        System.out.println(Thread.currentThread().getName());

        threadPool.shutdown();
    }
}

2.3 比价需求

经常会出现等待某条 SQL 执行完再继续执行下一条 SQL,而这两条 SQL 本身并无关系,可以并行执行,比如:我们需要比较同一个商品在各个平台上的价格,要求获取一个清单列表

  • 思路一:step by step,查完京东、查淘宝一个平台一个平台来
  • 思路二:多箭齐发,同时查询多个平台

代码如下:

package tech.kpretty.advanced.completablefuture;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@AllArgsConstructor
@NoArgsConstructor
class Platform {
    @Getter
    private String gName;
    @Getter
    private String pName;
    private double price;

    public double getPrice() {
        // 查询需要 1s
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return price;
    }
}

public class NetMall {
    public static void main(String[] args) {
        List<Platform> platforms = Arrays.asList(new Platform("京东", "Java并发编程", 11.1),
                new Platform("淘宝", "Java并发编程", 21.1),
                new Platform("当当", "Java并发编程", 31.1));

        t1(platforms);
        System.out.println("---------------------");
        t2(platforms);
    }

    // step by step
    private static void t1(List<Platform> platforms) {
        long start = System.currentTimeMillis();
        platforms
                .stream()
                .map(x -> x.getGName() + " 在 " + x.getPName() + " 卖 " + x.getPrice())
                .forEach(System.out::println);
        long end = System.currentTimeMillis();
        System.out.println("时间花销:" + (end - start));
    }

    // 多箭齐发
    private static void t2(List<Platform> platforms) {
        long start = System.currentTimeMillis();
        platforms
                .stream()
                .map(x -> CompletableFuture.supplyAsync(() -> x.getGName() + " 在 " + x.getPName() + " 卖 " + x.getPrice()))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .forEach(System.out::println);
        long end = System.currentTimeMillis();
        System.out.println("时间花销:" + (end - start));
    }

}

结果如下:

京东 在 Java并发编程 卖 11.1
淘宝 在 Java并发编程 卖 21.1
当当 在 Java并发编程 卖 31.1
时间花销:3045
---------------------
京东 在 Java并发编程 卖 11.1
淘宝 在 Java并发编程 卖 21.1
当当 在 Java并发编程 卖 31.1
时间花销:1014

有同学就问了:这不就是多线程并发吗?和异步有什么关系?

确实这就是多线程并发,但需要知道一点就是多线程是异步的一种实现思路,CompletableFuture 支持异步只是其中的一个特点,还有比如上面的函数式编程、异步回调、线程编排等等。

2.4 常用方法

2.4.1 获得结果和触发计算

T get():获取结果阻塞主线程,抛异常

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newCachedThreadPool();

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) { e.printStackTrace();}
            return 1;
        }, threadPool);
        // 获取结果,阻塞主线程
        System.out.println(completableFuture.get());

        threadPool.shutdown();
    }
}

T get(long timeout, TimeUnit unit):同 get,允许等待一段时间,等待中完成计算返回结果,等待结束没有计算完抛出异常

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newCachedThreadPool();

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) { e.printStackTrace();}
            return 1;
        }, threadPool);
        // 同 get,允许等待一段时间,等待中完成计算返回结果,等待结束没有计算完抛出异常
        System.out.println(completableFuture.get(2,TimeUnit.SECONDS));

        threadPool.shutdown();
    }
}

T getNow(T valueIfAbsent):立刻获取结果,计算完返回计算结果,没有计算完返回默认结果,异步线程继续执行(区别 compelet)

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newCachedThreadPool();

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) { e.printStackTrace();}
            return 1;
        }, threadPool);
        // 立刻获取结果,计算完返回计算结果,没有计算完返回默认结果
        System.out.println(completableFuture.getNow(-1));

        threadPool.shutdown();
    }
}

boolean complete(T value):立刻获取结果,计算完返回计算结果,没有计算完返回默认结果,异步线程中断(区别 getNow)

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newCachedThreadPool();

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) { e.printStackTrace();}
            System.out.println("---");
            return 1;
        }, threadPool);
        // 立刻获取结果,计算完返回计算结果,没有计算完返回默认结果
        System.out.println(completableFuture.complete(-1)? -1 : completableFuture.get());

        threadPool.shutdown();
    }
}

2.4.2 对计算结果进行处理

thenApply:依赖上一步结果,对上一步结果进行二次处理(串行)

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        CompletableFuture<Integer> completableFuture =
                CompletableFuture.supplyAsync(() -> {
                    try { TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace();}
                            System.out.println(Thread.currentThread().getName());
                            return 1;
                        }, threadPool)
                        .thenApply(x -> {
                            System.out.println(Thread.currentThread().getName());
                            return x + 1;
                        })
                        .thenApply(x -> {
                            System.out.println(Thread.currentThread().getName());
                            return x + 2;
                        })
                        .whenComplete((x, e) -> {
                            if (e == null) {
                                System.out.println(Thread.currentThread().getName());
                                System.out.println(x);
                            }
                        })
                        .exceptionally(e -> {
                            System.out.println(e.getMessage());
                            return -1;
                        });
        threadPool.shutdown();
    }
}

几点说明:

  1. thenApply 若出现异常,后续不在执行,直接走 exceptionally,类似 try...finally
  2. thenApply 线程执行问题,若 supplyAsync 执行的很快,则 thenApply 会被 main 线程执行,否则被线程池里的线程执行(自带的ForkJoin或自定义)

handle:类似 thenApply,出现异常不会终止,会带着异常继续执行

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        CompletableFuture<Integer> completableFuture =
                CompletableFuture.supplyAsync(() -> {
                    try { TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace();}
                            System.out.println(Thread.currentThread().getName());
                            return 1;
                        }, threadPool)
                        .handle((x,e)->{
                            System.out.println(Thread.currentThread().getName());
                            int i = 10/0;
                            return x+1;
                        })
                        .handle((x,e)->{
                            System.out.println(Thread.currentThread().getName());
                            return x+2;
                        })
                        .whenComplete((x, e) -> {
                            if (e == null) {
                                System.out.println(Thread.currentThread().getName());
                                System.out.println(x);
                            }
                        })
                        .exceptionally(e -> {
                            System.out.println(e.getMessage());
                            return -1;
                        });
        threadPool.shutdown();
    }
}

2.4.3 对计算结果进行消费

thenAccept:依赖上一步结果,与 thenApply 区别在于没有返回值

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);


        CompletableFuture.supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                    return 1;
                }, threadPool)
                .thenAccept(System.out::println);

        threadPool.shutdown();
    }
}

thenRun:不依赖上一步结果,上一步执行结束后执行当前线程

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);


        CompletableFuture.supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                    return 1;
                }, threadPool)
                // 什么都不输出
                .thenRun(System.out::println);

        threadPool.shutdown();
    }
}

2.4.4 对计算速度进行选优

applyToEither:比较两个 CompletableFuture,谁快选谁

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);


        System.out.println(CompletableFuture.supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //System.out.println(Thread.currentThread().getName());
                    return 1;
                }, threadPool)
                .applyToEither(CompletableFuture.supplyAsync(() -> {
                    //System.out.println(Thread.currentThread().getName());
                    return 2;
                }), x -> {
                    // 返回 2,第二个 CompletableFuture 更快
                    return x;
                }).join());

        threadPool.shutdown();
    }
}

2.4.5 对计算结果进行合并

thenCombine:将两个 CompletableFuture 的结果进行合并,先算完的需要等待;当且仅当两个 CompletableFuture 都算法才会进行合并操作

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        CompletableFuture<Integer> thenCombine = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 20;
        }), (x, y) -> {
            System.out.println(Thread.currentThread().getName());
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 30;
        }), (a, b) -> {
            System.out.println(Thread.currentThread().getName());
            return a + b;
        });

        System.out.println("main结束");
        System.out.println(thenCombine.join());

        threadPool.shutdown();
    }
}

注:两两合并可无限套娃

2.4.6 补充

在练习上述的 API 时发现所有的方法都有一个 xxxAsync 的方法,那个 xxx 与 xxxAsync 有什么区别?就以 thenApply 为例

package tech.kpretty.advanced.completablefuture;

import java.util.concurrent.*;

public class CompletableFutureMethod {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 实际生产用自定义线程池,不要用java提供的
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        CompletableFuture<Integer> completableFuture =
                CompletableFuture.supplyAsync(() -> {
                            try {
                                TimeUnit.SECONDS.sleep(1);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread().getName());
                            return 1;
                        }, threadPool)
                        .thenApply(x -> {
                            System.out.println(Thread.currentThread().getName());
                            return x + 1;
                        })
                        .thenApplyAsync(x -> {
                            System.out.println(Thread.currentThread().getName());
                            return x + 2;
                        })
                        .whenComplete((x, e) -> {
                            if (e == null) {
                                System.out.println(Thread.currentThread().getName());
                                System.out.println(x);
                            }
                        })
                        .exceptionally(e -> {
                            System.out.println(e.getMessage());
                            return -1;
                        });
      threadPool.shutdown();
    }
}

区别:

  1. xxx 方法:执行当前任务的线程继续执行 xxx 的任务
  2. xxxAsync 方法:把 xxxAsync 的任务交给线程池来执行

三、线程中断

面试题:聊一聊线程中断!如何停止、终端一个运行中的线程

3.1 什么是中断

java 的设计哲学中,一个线程不应该由其他线程来强制中断或者停止,而是应该由线程自己来停止,即:我命由我不由天。因此 Thread 类中一些 stop、resume 皆已弃用。

在 java 中没有办法立刻停止一个线程,然而停止线程却显得尤为重要,如何取消一个耗时的操作呢?为此 java 提供了一种线程中断协商机制,由其他线程发起中断请求,至于该线程是否中断、如何中断,java 没有给出任何语法,中断的过程完全需要程序员自己实现。

每一个线程都有一个标识,用于标识线程是否被中断,该标识为 true 表示中断,false 表示未中断,通过调用线程对象的 interrupt 方法,将该线程对象的标识为设置为 true,但这个方法不会中断线程,仅仅修改了中断标识仅此而已,若要实现中断需求则需要在代码中不断的检测当前线程的标识位,若检测到标识位为 true 此时究竟做什么需要靠自己的代码实现(可以不管这个标识,我不中断,属于协商,可以不采纳)

3.2 中断 API

java 为中断提供了三个 API 如下

APIDesc
public void interrupt()实例方法,仅仅设置线程的中断标记
public static boolean interrupted()静态方法,判断线程是否被中断,并清楚当前中断状态,即:返回当前中断标记,随后将中断标记设置为 false
public boolean isInterrupted()实例方法,返回线程的中断标记

3.3 实现线程中断

3.3.1 方式一:volatile

package tech.kpretty.advanced.interrupt;

import java.util.concurrent.TimeUnit;

public class M1Volatile {
    private static volatile boolean isStop = false;

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                if (isStop) {
                    System.out.println(Thread.currentThread().getName() + ":线程退出");
                    break;
                }
                System.out.println("--- hello ---");
            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        isStop = !isStop;
    }
}

可以实现线程中断

3.3.2 方式二:AtomicBoolean

package tech.kpretty.advanced.interrupt;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class M2AtomicBoolean {
    private static final AtomicBoolean isStop = new AtomicBoolean(false);

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                if (isStop.get()) {
                    System.out.println(Thread.currentThread().getName() + ":线程退出");
                    break;
                }
                System.out.println("--- hello ---");
            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        isStop.set(true);
    }
}

可以实现线程中断

3.3.3 方式三:API

package tech.kpretty.advanced.interrupt;

import java.util.concurrent.TimeUnit;

public class M3API {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            while (true) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println(Thread.currentThread().getName() + ":线程退出");
                    break;
                }
                System.out.println("--- hello ---");
            }
        });
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 修改中断标记
        thread.interrupt();
    }
}

interrupt,isInterrupt 最终都是调用的本地方法,由操作系统提供支持。

3.4 中断隐藏坑

调用 interrupt 会立刻中断线程吗?

答案:不会!interrupt 仅仅修改了线程的中断标记,甚至线程没有实现中断的功能,该标记就是一个摆设,对线程不会有任何影响;但在一些特殊情况 interrupt 会对线程产生非常严重的影响

见下面代码

package tech.kpretty.advanced.interrupt;

import java.util.concurrent.TimeUnit;

public class InterruptDemo {
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            for (int i = 1; i <= 300; i++) {
                System.out.println("---- i: " + i + " ----");
            }
            System.out.println("2.线程是否中断: " + Thread.currentThread().isInterrupted());
        });

        t1.start();

        System.out.println("1.线程是否中断: " + t1.isInterrupted());
        // 让线程先打印一小会
        try { TimeUnit.MILLISECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace();}
        // 中断线程
        t1.interrupt();
        System.out.println("3.线程是否中断: "+t1.isInterrupted());
        // 等待一段时间,待线程打印完
        try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) { e.printStackTrace();}
        System.out.println("4.线程是否中断: "+t1.isInterrupted());
    }
}

注意打印结果,线程在打印到 218 时中断标记已经是 true,但丝毫不影响线程继续执行,因此 interrupt 不会立刻打断线程,甚至不会打断(具体看定义的逻辑)

interrupt 大坑复现

见下面代码

package tech.kpretty.advanced.interrupt;

import java.util.concurrent.TimeUnit;

public class InterruptDemo {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println(Thread.currentThread().getName() + ":线程退出");
                    break;
                }
                // 1s 打印一次
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("--- hello ---");
            }
        });

        thread.start();

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 5s 后尝试打断线程
        thread.interrupt();
    }
}

最终结果如下:

--- hello ---
--- hello ---
--- hello ---
--- hello ---
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at tech.kpretty.advanced.interrupt.InterruptDemo.lambda$main$0(InterruptDemo.java:15)
	at java.lang.Thread.run(Thread.java:748)
--- hello ---
--- hello ---
--- hello ---
...

程序抛出异常后,并没有打断线程,那可是一个灾难级的bug,该停不停,那原因是什么呢?

看一下 interrupt 源码的注释:

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

意思就是若线程处于等待、join、睡眠状态时,此时 interrupt 会抛出 InterruptedException 并清空中断标记,即重置中断标记位 false,同时又因为 sleep 方法抛出这个异常,我们在异常捕获时仅打印堆栈信息,中断标记依然是 false 就无限循环了。解决方式很简单:在异常捕获中再次调用 interrupt 即可(很多框架源码都是这么做的)

正确的代码如下:

package tech.kpretty.advanced.interrupt;

import java.util.concurrent.TimeUnit;

public class InterruptDemo {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println(Thread.currentThread().getName() + ":线程退出");
                    break;
                }
                // 1s 打印一次
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    // 再次中断
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
                System.out.println("--- hello ---");
            }
        });

        thread.start();

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 5s 后尝试打断线程
        thread.interrupt();
    }
}

3.5 补充

静态方法 interrupted :返回线程的中断标记,随后将中断标记重置为 false

猜一猜下面代码输出什么:

package tech.kpretty.advanced.interrupt;


public class InterruptDemo {
    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + "---" + Thread.interrupted());
        System.out.println(Thread.currentThread().getName() + "---" + Thread.interrupted());
        System.out.println("111111");
        Thread.currentThread().interrupt();
        System.out.println("222222");
        System.out.println(Thread.currentThread().getName() + "---" + Thread.interrupted());
        System.out.println(Thread.currentThread().getName() + "---" + Thread.interrupted());
    }
}
0

评论区