从多线程开始,进入并发专题学习。
1. 详细讲解多线程的发展历史
-
真空管和穿孔打卡
操作员在机房里面来回调度资源,以及计算机同一个时刻只能运行一个程序,在程序输入的过程中,计算机计算机和处理空闲状态 。而当时的计算机是非常昂贵的,人们为了减少这种资源的浪费。就采用了 批处理系统来解决。
-
晶体管和批处理系统
批处理操作系统的运行方式:在输入室收集全部的作业,然后用一台比较便宜的计算机把它们读取到磁带上。然后把磁带输入到计算机,计算机通过读取磁带的指令来进行运算,最后把结果输出磁带上。批处理操作系统的好处在于,计算机会一直处于运算状态,合理的利用了计算机资源。
批处理操作系统虽然能够解决计算机的空闲问题,但是当某一个作业因为等待磁盘或者其他 I/O 操作而暂停时,那CPU 就只能阻塞直到该 I/O 完成,对于 CPU 操作密集型的程序,I/O 操作相对较少,因此浪费的时间也很少。但是对于 I/O 操作较多的场景来说,CPU 的资源是属于严重浪费的。
-
集成电路和多道程序设计
多道程序设计就是把内存分为几个部分,每一个部分放不同的程序。当一个程序需要等待I/O 操作完成时。那么 CPU 可以切换执行内存中的另外一个程序。如果内存中可以同时存放足够多的程序,那 CPU的利用率可以接近 100%。
在这个时候,引入了第一个概念- 进程, 进程的本质是一个正在执行的程序,程序运行时系统会创建一个进程,并且给每个进程分配独立的内存地址空间保证每个进程地址不会相互干扰。同时,在 CPU 对进程做时间片的切换时,保证进程切换过程中仍然要从进程切换之前运行的位置出开始执行。所以进程通常还会包括程序计数器、堆栈指针。
进程的出现可以让操作系统从宏观层面实现多应用并发。而并发的实现是通过 CPU 时间片不端切换执行的。对于单核 CPU 来说,在任意一个时刻只会有一个进程在被CPU 调度。
-
为何出现线程
- 在多核 CPU 中,利用多线程可以实现真正意义上的并行执行;
- 在一个应用进程中,会存在多个同时执行的任务,如果其中一个任务被阻塞,将会引起不依赖该任务的任务也被阻塞。通过对不同任务创建不同的线程去处理,可以提升程序处理的实时性;
- 线程可以认为是轻量级的进程,所以线程的创建、销毁比进程更快.
2. 线程的基本应用
在 Java 中,有多种方式来实现多线程。继承 Thread 类、实现 Runnable 接口、使用 ExecutorService、Callable、Future 实现带返回结果的多线程。
-
继承 Thread 类
public class MyExtendsThread extends Thread { @Override public void run() { System.out.println("This is a thread with extending..."); } public static void main(String[] args) { new MyExtendsThread().start(); } }
-
实现 Runnable 接口
public class MyRunnableThread implements Runnable { @Override public void run() { System.out.println("This is a thread with Runnable ... "); } public static void main(String[] args) { new Thread(new MyRunnableThread()).start(); } }
-
实现 Callable 接口
public class MyCallableThread implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(5000); return "CallableThread`s response ..."; } public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); MyCallableThread thread = new MyCallableThread(); Future<String> future = executorService.submit(thread); System.out.println(future.get()); executorService.shutdown(); } }
2.1如何在实际过程中应用多线程
在工作中应该很少有场景能够应用多线程了,因为基于业务开发来说,很多使用异步的场景我们都通过分布式消息队列来做了。但并不是说多线程就不会被用到,你们如果有看一些框架的源码,会发现线程的使用无处不在之前我应用得比较多的场景是在做文件跑批,每天会有一些比如收益文件、对账文件,我们会有一个定时任务去拿到数据然后通过线程去处理.
Zookeeper 源码中看到一个比较有意思的异步责任链模式:
-
Request
public class Request { private String name; public Request(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Request{" + "name='" + name + '\'' + '}'; } }
-
RequestProcessor
public interface RequestProcessor { /** * 处理请求 * @param req */ void processRequest(Request req); }
-
PrintProcessor
public class PrintProcessor extends Thread implements RequestProcessor{ private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue(); private RequestProcessor processor; public PrintProcessor(RequestProcessor processor) { this.processor = processor; } @Override public void processRequest(Request req) { queue.add(req); } @Override public void run() { while (true) { try { Request request = queue.take(); Thread.sleep(100); System.out.println("print data :" + request.getName()); processor.processRequest(request); } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
SaveProcessor
public class SaveProcessor extends Thread implements RequestProcessor{ private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue(); @Override public void processRequest(Request req) { queue.add(req); } @Override public void run() { while (true) { try { Request request = queue.take(); Thread.sleep(100); System.out.println("save data :" + request.getName() + "over ... "); } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
Main
public class Main { public static void main(String[] args) throws InterruptedException { SaveProcessor saveProcessor = new SaveProcessor(); PrintProcessor printProcessor = new PrintProcessor(saveProcessor); saveProcessor.start(); printProcessor.start(); for (int i = 0; i < 100; i++) { Request request = new Request("request" + i); printProcessor.processRequest(request); } Thread.sleep(10000); } }
3. 线程的基础知识
- 线程的生命周期
线程一共有 6 种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED):
-
NEW:初始状态,线程被构建,但是还没有调用 start 方法
-
RUNNABLED:运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一称为“运行中”
- BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况:
- 等待阻塞:运行的线程执行 wait 方法,jvm 会把当前线程放入到等待队列;
- 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程放入到锁池中;
- 其他阻塞:运行的线程执行 Thread.sleep 或者 t.join 方法,或者发出了 I/O 请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、io 处理完毕则线程恢复;
-
TIME_WAITING:超时等待状态,超时以后自动返回;
- TERMINATED:终止状态,表示当前线程执行完毕.
-
线程状态代码展示
public class BlockDemo extends Thread { @Override public void run() { while (true) { synchronized (BlockDemo.class) { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " 执行完毕..."); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
public class ThreadStateTest { public static void main(String[] args) { new Thread(() -> { while (true) { try { System.out.println(Thread.currentThread().getName() + " is running"); Thread.sleep(2000); // 尝试唤醒 阻塞的线程 } catch (InterruptedException e) { e.printStackTrace(); } } },"time_waiting-thread").start(); // /WAITING,线程在 ThreadStatus 类锁上通过 wait 进行等待,并释放锁 new Thread(() -> { while (true) { synchronized (ThreadStateTest.class) { System.out.println(Thread.currentThread().getName() + " is waiting"); try { ThreadStateTest.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" is running"); } } },"waiting-thread").start(); new Thread(() -> { while (true) { synchronized (ThreadStateTest.class) { System.out.println(Thread.currentThread().getName() +"is notifying"); ThreadStateTest.class.notifyAll(); System.out.println(Thread.currentThread().getName() +"is sleeping"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } },"notify-thread").start(); // sleep 不会释放锁 new Thread(new BlockDemo(),"BlockThread-01").start(); new Thread(new BlockDemo(),"BlockThread-02").start(); } }
4. 线程的启动
学习线程的时候会比较疑惑,启动一个线程为什么是调用 start 方法,而不是 run 方法,这里做一个简单的分析,先简单看一下 start 方法的定义:
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0(); // 调用了底层的 native 方法
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0(); // 原生方法
这里看到调用了底层的 start0() 方法,关于底层源码暂且不做分析,可以猜测内部调用最终会走到平台层面的创建线程方法,而最终去调用了 thread 中的 run() 方法;
5. 线程的终止
线程的终止,并不是简单的调用 stop 命令去。虽然 api 仍然可以调用,但是和其他的线程控制方法如 suspend、resume 一样都是过期了的不建议使用,就拿 stop 来说,stop 方法在结束一个线程时并不会保证线程的资源正常释放,因此会导致程序可能出现一些不确定的状态。要优雅的去中断一个线程,可以使用线程中提供的一个 interrupt 方法。
-
interrupt 方法
当其他线程通过调用当前线程的 interrupt 方法,表示向当前线程打个招呼,告诉他可以中断线程的执行了,至于什么时候中断,取决于当前线程自己。线程通过检查资深是否被中断来进行相应,可以isInterrupted() 来判断是否被中断。举例如下:
public class InterruptDemo { private static int count; public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { // 默认为 true while (!Thread.currentThread().isInterrupted()) { count++; System.out.println(count); } }, "interrupt-thread"); thread.start(); Thread.sleep(1000); thread.interrupt(); } }
这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅。
上述示例中,通过 interrupt,设置了一个标识告诉线程可以终止了,Thread.interrupted() 对设置中断标识的线程复位。
比如在下面的案例中,外面的线程调用 thread.interrupt 来设置中断标识,而在线程里面,又通过 Thread.interrupted 把线程的标识又进行了复位。
public class InterruptDemo2 { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { while (true) { if (Thread.currentThread().isInterrupted()) { System.out.println("Before status :" + Thread.currentThread().isInterrupted()); Thread.interrupted(); // 对线程进行复位 System.out.println("After status :" + Thread.currentThread().isInterrupted()); } } }, "interrupt-thread2"); thread.start(); TimeUnit.SECONDS.sleep(1); thread.interrupt(); } }
除了通过 Thread.interrupted 方法对线程中断标识进行复位 以 外 , 还 有 一 种 被 动 复 位 的 场 景 , 就 是 对 抛 出 InterruptedException 异 常 的 方 法 , 在 InterruptedException 抛出之前,JVM 会先把线程的中断标识位清除,然后才会抛出 InterruptedException,这个时候如果调用 isInterrupted 方法,将会返回 false。示例如下:
public class InterruptDemo3 { private static int count; public static void main(String[] args) throws InterruptedException { Thread thread01 = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { count++; } System.out.println("num1: " + count); }, "thread-1"); Thread thread02 = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { try { System.out.println("thread02 is running"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("num2: " + count); }, "thread-2"); thread01.start(); thread02.start(); TimeUnit.SECONDS.sleep(1); thread01.interrupt(); thread02.interrupt(); System.out.println("thread01 = " + thread01.isInterrupted()); System.out.println("thread02 = " + thread02.isInterrupted()); } }
Thread.interrupted()是属于当前线程的,是当前线程对外界中断信号的一个响应,表示自己已经得到了中断信号,但不会立刻中断自己,具体什么时候中断由自己决定,让外界知道在自身中断前,他的中断状态仍然是 false,这就是复位的原因。
微信扫描二维码
(转载本站文章请注明作者和出处 鲁先生-hanlinsir)