多线程A. 并发的发展历史- 真空管/穿孔打卡
- 最初的计算机是为了解决计算问题的,如:正弦余弦计算
- 把程序写到一张纸上,穿孔打成一张卡片,然后传给计算机
- 晶体管/批处理操作系统
- 解决计算一在某个时刻只能运行一个程序
- 但是带来了 IO 问题
- 如果有 IO 阻塞会导致后面的程序都阻塞(串行模型)
- 集成电路/多道程序设计
- 进程的产生(一块内存可以加载多个程序)
- CPU 时间片在各个进程之间切换
- 并没有真正的并行,执行在 CPU 时间片上来回切换
B. 线程- 线程就是轻量级的进程
- CPU 从单核发展到了多核,真正意义上实现了并行计算
C. Java 中如何应用线程- Runable 接口
- Thread类(本质是对 Runnable 接口的实现)
- Callable/Future 带返回值的线程
- ThreadPool
[Java] 纯文本查看 复制代码 public class Thread implements Runnable {
@Override
public void run() {
if (target != null) {
target.run();
}
}
}
[Java] 纯文本查看 复制代码 public class CallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService threadPools = Executors.newFixedThreadPool(10);
Future<String> future = threadPools.submit(new MyCallable());
System.out.println(future.get());
threadPools.shutdown();
}
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return "HelloWorld!";
}
}
}
- 线程可以核里的利用多核 CPU 资源,提高程序的吞吐量
D. 实际应用- 线程池
- 文件跑批:收益文件,对账文件...
- BIO 模型优化
- socket.appept 连接阻塞
- socket.getInputStream 读阻塞
- socket.getOutputStream 写阻塞
- 然多线程处理读写阻塞
- 责任链代码
[Java] 纯文本查看 复制代码 @AllArgsConstructor
@Data
public class Request {
private String name;
}
[Java] 纯文本查看 复制代码 public interface RequestProcessor {
void processRequest(Request request);
}
@AllArgsConstructor
@SuppressWarnings("all")
public class PrevProcessor extends Thread implements RequestProcessor {
private final LinkedBlockingDeque<Request> requests = new LinkedBlockingDeque<>();
private final RequestProcessor nextProcessor;
@Override
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while (true) {
try {
Request request = requests.take();
System.out.println("PreDataProcess: " + request.getName());
if (nextProcessor != null) nextProcessor.processRequest(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@AllArgsConstructor
@SuppressWarnings("all")
public class PrintProcessor extends Thread implements RequestProcessor {
private final LinkedBlockingDeque<Request> requests = new LinkedBlockingDeque<>();
private final RequestProcessor nextProcessor;
@Override
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while (true) {
try {
Request request = requests.take();
System.out.println("PrintData: " + request.getName());
if (nextProcessor != null) nextProcessor.processRequest(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@AllArgsConstructor
@SuppressWarnings("all")
public class SaveProcessor extends Thread implements RequestProcessor {
private final LinkedBlockingDeque<Request> requests = new LinkedBlockingDeque<>();
private final RequestProcessor nextProcessor;
@Override
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while (true) {
try {
Request request = requests.take();
System.out.println("SaveData: " + request.getName());
if (nextProcessor != null) nextProcessor.processRequest(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
[Java] 纯文本查看 复制代码 public class MainTest {
private RequestProcessor requestProcessor;
"创建责任链"
public MainTest() {
SaveProcessor saveProcessor = new SaveProcessor(null);
saveProcessor.start();
PrintProcessor printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
requestProcessor = new PrevProcessor(printProcessor);
((PrevProcessor) requestProcessor).start();
}
public static void main(String[] args) {
MainTest mainTest = new MainTest();
Request request = new Request("客户端数据包");
mainTest.requestProcessor.processRequest(request);
System.out.println("success!");
}
}
并发基础A. 生命周期- 线程一共有 6 种状态
- new(创建一个线程)
- runnable(运行状态)
- 就绪状态(ready)
- ready --> running 是 CPU 调度
- 运行中(running)
- running --> ready 可调用 yield 礼让出去
- terminated
- runnable --> terminated 是当 run 方法执行结束时切换
- BLOCKED 只有出现锁等待的时候才会出现 BLOCKED 状态
- 访问带有 synchronize 修饰的方法时
- 等待锁的时候返回 BLOCKED 状态
- WAITING/TIME_WAITING
- 这两个区别不大,只是 TIME_WAITING 带有超时时间
- 调用 wait/join/LockSupport.park
- 调用 notify/notifyAll/LockSupport.unpart
[Java] 纯文本查看 复制代码 public class ThreadStatusDemo {
public static void main(String[] args) {
"TIME_WAITING"
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Time_Waiting").start();
"WAITING,线程在 ThreadStatus 类锁上通过 wait 进行等待"
new Thread(() -> {
synchronized (ThreadStatusDemo.class) {
try {
ThreadStatusDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Waiting").start();
"线程在 ThreadStatus 加锁后,不会释放锁"
new Thread(new BlockThread(), "BlockDemo-01").start();
new Thread(new BlockThread(), "BlockDemo-01").start();
System.out.println(Thread.currentThread().getName());
}
static class BlockThread implements Runnable {
@Override
public void run() {
synchronized (BlockThread.class) {
try {
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
- 使用 jps 查看当前 ThreadStatusDemo 的进程号
- 再使用 jstack.exe 进程号
B. 线程的生命周期- 线程的启动为什么是 start
- 调用 native 方法调用操作系统的资源
- 然后回调 run 方法
- 线程的终止
- 不建议使用 stop 方法(不安全操作)
- stop 方法在结束一个线程时并不会保证线程的资源正常释放
- 因此回导致程序可能出现一些不确定的状态
- 要优雅的去终端一个线程,在线程中提供了一个 interrupt 方法
- 当其他线程(如主线程)调用当前线程的 interrupt 方法
- 表示向当前线程打招呼
- 告诉它可以终端线程的执行,至于什么时候中断,取决于当前线程自己
- 线程通过检查自身是否被中断来进行响应,可以通过 isInterrupted 方法来判断是否被终端
- thread.interrupt()
- 跟在 while 循环里通过 flag 结束循环思想是一致的
- InterruptDemo.java
- 线程的中断和线程的复位
[Java] 纯文本查看 复制代码 public class InterruptDemo {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread subThread = new Thread(() -> {
"默认情况下 isInterrupted 返回 false, 通过 thread.interrupt 变成了 true"
while (!Thread.currentThread().isInterrupted()) count++;
System.out.println("count: " + count);
}, "Sub_Thread");
subThread.start();
TimeUnit.SECONDS.sleep(2);
"阻断线程运行"
subThread.interrupt();
}
}
- 这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源
- 而不是武断地将线程停止
- 因此这种终止线程的做法显得更加安全和优雅
- 上面的案例代码,通过 interrupt,设置了一个表示告诉线程可以终止了
- 线程中还提供了静态方法 Thread.interrupted() 对设置终端表示的线程复位
- 比如在上面的案例中,外面的线程调用 thread.interrupt 来设置中断标识
- 而线程里面又可以通过 Thread.interrupted 把线程的标识又进行了复位
[Java] 纯文本查看 复制代码 public class InterruptDemo2 {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread subThread = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("before: " + Thread.currentThread().isInterrupted());
"对线程进行复位,由 true 变成 false"
Thread.interrupted();
System.out.println("after: " + Thread.currentThread().isInterrupted());
}
}
}, "Sub_Thread");
subThread.start();
TimeUnit.MILLISECONDS.sleep(1);
"阻断线程运行"
subThread.interrupt();
}
}
其他的线程复位 - 除了通过 Thread.interrupted 方法对线程中断标识进行复位以外,还有一种被动复位的场景
- 就是对抛出 InterruptedException 异常的方法
- 在 InterruptedException 抛出之前, JVM 会先把线程的中断标识位清除
- 然后才抛出 InterruptedException
- 这个时候如果调用 isInterrupted 方法,将会返回 false
[Java] 纯文本查看 复制代码 public class InterruptDemo3 {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread subThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) count++;
System.out.println("count: " + count);
}, "Interrupt_Thread");
subThread.start();
TimeUnit.MILLISECONDS.sleep(1);
subThread.interrupt();
System.out.println(subThread.isInterrupted());
// true
}
}
@SuppressWarnings("all")
public class InterruptDemo4 {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread subThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
}, "Interrupt_Thread");
subThread.start();
TimeUnit.MILLISECONDS.sleep(1);
subThread.interrupt();
System.out.println(subThread.isInterrupted());
// false
}
}
- Thread.interrupted() 是属于当前线程的,是当前现场对外界中断信号的一个响应,表示自己已经得到了中断信号
- 但是不会立即中断自己
- 具体什么时候中断由自己决定,让外界知道再自身中断前,他的状态仍然是 false,这就是复位的原因
- 在 C++ 代码中,通过调用线程 thread.interrupt() 设置 interrupted 状态标识为 true
- 并通过 ParkEvent 的 unpark 方法环形线程
- 对 synchronized 阻塞的线程,被唤醒以后会继续尝试获取锁,如果失败仍然可能被 park
- 在调用 ParkEvent 的 park 方法之前,会先判断线程的中断状态,如果为 true,则清楚当前线程的中断标识
- Object.wait, Thread.sleep, Thread.join 会抛出 InterruptedException
C. 补充- 为什么 Object.wait, Thread.sleep 和 Thread.join 都会抛出 InterruptedException?
- 这些方法都是阻塞的方法
- 而阻塞方法的释放会取决于一些外部的事件,但是阻塞方法可能因为等不到外部的触发事件而导致无法终止
- 当一个方法抛出 InterruptedException 时,它时在告诉调用者如果执行该方法的线程被中断
- 它会长室停止正在做的事情
- 并且通过抛出 InterruptedException 标识提前返回
- 所以这个异常的意思时标识一个阻塞被其他线程中断了
- 然后由于现场调用了 interrupt() 中断方法
- 那么 Object.wait, Thread.sleep 扽被阻塞的线程被环形以后会通过 is_interrupted 方法判断中断标识的状态变化
- 如果发现中断标识为 true,则先清楚中断标志,然后抛出 InterruptedException
- 需要注意的是,InterruptedException 异常的抛出并不意味着线程必须终止,而是提醒当前线程有中断的操作发生
- 至于接下来怎么处理取决于线程本身,比如
- 直接捕获异常不做任何处理
- 将一场抛出
- 停止当前线程,打印异常信息
|