0%

concurrency

Thread

Basic

  • 线程(Thread)是执行任务的基本单位
  • 进程(Process)是分配资源的基本单位

Create

  • 继承Thread,并实现run方法,然后调用start启动
  • 实现Runnable,并作为target传递给线程进行启动
  • 实现Callable,并用FutureTask包装后作为target传递给线程进行启动
  • 使用线程池的execute方法执行Runnable
  • 使用线程池的submit方法提交Runnable或者Callable
  • 使用CompletableFuture类

Config

Name

Priority

Daemon

State

NEW、READY、RUNNING、WAITING、TIMED_WAITING、BLOCKED、TERMINATED

  • NEW(新建):创建线程后进入此状态
  • READY(就绪):创建后调用Thread.start或者运行时调用Thread.yield进入此状态
  • RUNNING(运行):获取资源(cpu和lock)成功后进入此状态
  • WAITING(等待):调用Thread.join、Object.wait、LockSupport.park的非超时方法后进入此状态
  • TIMED_WAITING(超时等待):调用Thread.sleep、Thread.join、Object.wait、LockSupport.park的超时方法后进入此状态
  • BLOCKED(阻塞):获取lock失败后进入此状态
  • TERMINATED(死亡):结束线程后进入此状态

Resource

  • cpu:中央处理器资源
  • lock:锁资源

Control

  • Thread.yield:让出cpu资源并进入就绪状态不释放锁资源
  • Thread.sleep:让出cpu资源并进入等待状态不释放锁资源
  • Thread.join:让出cpu资源并进入等待状态会释放锁资源

ps:join的作用是阻塞当前线程c(调用t.join的那个线程)并等待线程t结束后继续运行
ps:join内部是基于wait(当前线程c调用线程t的wait方法后在t上wait)实现的,JVM会在线程t结束后自动notify在t上wait的线程

Block

  • Thread:sleep + 轮询
  • Object:wait + notify
  • Condition:await + signal
  • LockSupport:park + unpark
  • BlockingQueue:take + put

Interrupt

ps: interrupt 可以中断 sleepjoinwait 等那些判断了 isInterrupted 的操作

Terminate

如果线程在循环体中阻塞了(比如阻塞队列),是无法继续执行到volatile修饰的标记位置的,但是阻塞的线程是可以响应interrupt信号并抛出InterruptedException的

Deprecated

suspend/resume、stop、destroy

  • suspend:不会释放锁,会导致死锁问题
  • stop:会立即终止线程,导致操作被中断,会导致数据不一致和资源未释放等问题
  • destroy:从来都没有被实现过,且已经被废弃了(相当于suspend且没有后续的resume)

Question

线程和进程的区别

  • 线程是任务调度和指令执行的最小单元,进程是资源分配的最小单元
  • 线程开销小,进程开销大,线程又被称作轻量级进程
  • 进程可以包含多个线程,至少包含一个线程
  • 同一个进程的线程共享资源,不同进程之间的资源相互独立
  • 线程异常如果不处理会导致线程所属的进程挂掉,进程挂掉则不会影响其他进程

Runnable和Callable的区别

  • Runnable不能获取返回值
  • Callable和Future、FutureTask配合可以获取返回值

Thread的start和run的区别

  • start是本地方法,会启动线程
  • run是普通方法,不会启动线程

start调用了start0,start0是本地方法

Thread的start能调用多次吗

不能,再次调用时会检测到状态变化后抛出 IllegalThreadStateException 异常

Thread的sleep和yield的区别

  • 方法:都是Thread的静态方法
  • 锁:都不释放锁
  • 状态:sleep进入到等待状态,yield进入到就绪状态

Thread的sleep和join的区别

  • 方法:sleep是Thread的静态方法,join是Thread的实例方法
  • 锁:sleep不释放锁,join释放锁
  • 状态:都是进入到等待状态

Thread的join和wait的区别

  • 方法:join是Thread的实例方法,wait是Object的实例方法
  • 锁:都会释放锁
  • 状态:都是进入到等待状态

ps:join是通过wait来实现的,所以和wait没区别

ThreadLocal

Basic

Scene

Leak

Inherit

Question

ThreadLocal为什么会内存泄漏

  1. ThreadLocal是基于ThreadLocalMap实现的
  2. Thread引用了ThreadLocalMap
  3. ThreadLocalMap引用了key和value
  4. key弱引用了ThreadLocal
  5. 当ThreadLocal不再使用时,即没有强引用时
  6. 由于key是弱引用,所以ThreadLocal会被回收
  7. 此时无法通过ThreadLocal访问value,value应该被回收
  8. 由于Thread通过ThreadLocalMap间接强引用了value,所以要先回收Thread
  9. 但是如果Thread是线程池时,Thread不能被回收,所以value不能回收
  10. 此时value既不能访问又不能回收,就造成了内存泄漏

ps:ThreadLocal在get和set时会自动检测哪些key指向null的entry并清除,可以一定程度减轻内存泄漏的影响

ThreadLocalMap的key为什么是弱引用

  • 如果ThreadLocalMap的key为强引用,此时Thread就会通过ThreadLocalMap间接强引用了key
  • 如果Thread不回收,比如是线程池时,即使ThreadLocal不再使用了
  • 由于ThreadLocal还被Thread引用着,所以ThreadLocal会无法回收而导致内存泄漏

ThreadLocalMap的value为什么是强引用

如果value是弱引用,垃圾回收后value指向了null,此时ThreadLocal还活着却获取不到value对象就不符合逻辑

ThreadLocalMap为什么不用Thread做key

  • 如果用Thread做key(且ThreadLocalMap是ThreadLocal的实例),就会有多个线程访问map,就需要保证线程安全,复杂性会提高,并且并发性也会降低
  • 如果用ThreadLocal做key(且ThreadLocalMap是Thread的实例),那么访问map的线程都是持有map的那一个线程,就不需要保证线程安全,复杂性会降低,并且并发性也会提高
  • 如果是用二级map,那就会和ThreadLocalMap做key一样,就会有多个线程访问map,就需要保证线程安全,还需要两次寻址,复杂性会更高,并且并发性也会更低

ThreadLocal为什么要定义成静态变量

定义成实例变量,使用时会频繁创建threadLocal,导致垃圾回收频繁
定义成实例变量,使用时会重复创建value,导致内存浪费
定义成实例变量,threadLocal回收后,导致value内存泄漏

ThreadLocal和局部变量的区别

ThreadLocal是线程(隔离)变量,局部变量是方法(隔离)变量

ps:局部变量不逃逸时本质上还是线程(隔离)变量,因为局部变量属于方法私有,方法属于线程私有

Safe

Basic

  • 不可变:不可变对象一定是安全的
  • 绝对安全:对象的单个操作和复合操作都是安全的
  • 相对安全:对象的单个操作是安全的,复合操作需要额外的同步措施来保证安全
  • 兼容安全:对象的所有操作都不安全,但可以通过额外的同步措施来保证安全
  • 安全对立:即使采取了额外的同步措施也无法保证安全

线程安全之基本原则

  • 原子性:一系列操作不可被中断(synchronized、lock、atomic)
  • 可见性:对数据的修改可以及时地被其他线程看到(synchronized、lock、volatile)
  • 有序性:指令没有被重新排列(synchronized、lock、volatile)

线程安全之实现方案

  • 共享
    • 不可变
      • 不可变对象(Immutable):通过对象不可变机制实现
    • 访问控制
      • 悲观锁(LOCK):通过Mutex机制实现
      • 乐观锁(CAS):通过CAS机制实现
    • 读写分离
      • 写时复制(COW):通过COW机制实现
  • 私有
    • 局部变量(LocalVariable):通过方法私有机制实现
    • 线程变量(ThreadLocal):通过线程私有机制实现

ps:局部变量不逃逸时本质上还是线程(隔离)变量,因为局部变量属于方法私有,方法属于线程私有

线程安全之变量安全

  • 可共享变量:静态变量、实例变量
  • 只私有变量:局部变量、线程变量

ps:局部变量不逃逸时本质上还是线程(隔离)变量,因为局部变量属于方法私有,方法属于线程私有

线程安全之对象安全

  • 逸出:发布了未完成初始化的对象(创建对象时的指令重排会导致这种问题)
  • 封闭:ThreadLocal
  • 逃逸:局部变量暴露到方法之外了

线程安全之内存安全

JMM:Java Memory Model(java内存模型)

Question

什么是线程安全

当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个对象就是线程安全的

如何保证线程安全

  • 共享
    • 不可变
      • 不可变对象(Immutable):通过对象不可变机制实现
    • 访问控制
      • 悲观锁(LOCK):通过Mutex机制实现
      • 乐观锁(CAS):通过CAS机制实现
    • 读写分离
      • 写时复制(COW):通过COW机制实现
  • 私有
    • 方法局部对象(LocalVariable):通过方法私有机制实现
    • 线程隔离对象(ThreadLocal):通过线程私有机制实现

volatile能保证线程安全性吗

  • volatile只能保证可见性和有序性,不能保证原子性
  • volatile只能保证原子操作(比如读和写操作)的线程安全
  • volatile不能保证非原子操作(比如自增和自减操作)的线程安全

ps:volatile不能保证原子性,只能用来修饰哪些已经保证了原子性的操作,比如 flag读写cas操作

ps:也可以使用 atomic类 来保证操作的安全 (atomic依赖volatile并使用了cas保证原子性)

volatile的底层实现原理

  • 保证可见性:内存屏障
  • 保证有序性:内存屏障

单例模式的双检查实现中volatile的作用

  • 保证可见性:volatile会保证读最新值,避免双重判断时没有读到最新状态而重复创建对象
  • 保证有序性:volatile会禁止指令重排,避免还没初始化完成的对象被提前暴露引用并使用

Lock

分布式锁实现方案

  • mysql(不常用,性能比较差)
  • zookeeper(不常用,性能比较差)
  • redis(较常用,性能比较好,但存在一致性问题)
  • etcd(较常用,性能较差点,但不存在一致性问题)

Basic

锁的作用

通过互斥机制和队列机制将并发操作变为串行操作从而保证并发操作的线程安全

并发操作的安全保障

  1. 悲观锁(LOCK):适合 低并发常规读写强一致性 的场景
  2. 乐观锁(CAS ):适合 高并发读多写少弱一致性 的场景
  3. 写时复制(COW):适合 高并发读多写少弱一致性 的场景

ps:CAS如果写多的话,竞争激烈时大量的失败导致cpu做了很多无用功从而占用和浪费cpu资源
ps:COW如果写多的话,频繁分配内存时来不及回收会造成内存占用过高
ps:CAS比COW的效率更高,但CAS支持设置操作却不支持插入和删除的操作

并发操作的读写控制

  1. 悲观锁(LOCK):写写互斥,读写互斥,读读互斥
  2. 乐观锁(CAS ):写写互斥,读写不互斥,读读不互斥
  3. 写时复制(COW):写写互斥,读写不互斥,读读不互斥

ps:读写不互斥时会存在弱一致性的问题

并发操作的常见问题

  • 读问题
    • 并发读:不存在问题
    • 写后读:不一致性(脏读、不可重复读、幻读)
  • 写问题
    • 并发写:插入冲突、更新丢失
    • 读后写:写入偏差
  • 死锁

锁的分类

锁的类型分类

  • 是否锁住资源:悲观锁(锁住资源)、乐观锁(不锁住资源)
  • 是否独占资源:排他锁(独占资源)、共享锁(共享资源)
  • 是否阻塞线程:同步锁(阻塞)、自旋锁(不阻塞)

ps:读写锁一般是写独占和读共享的,即写写互斥,读写互斥,读读不互斥

锁的实现分类

  • 是否锁住资源:悲观锁(synchronized、ReentrantLock)、乐观锁(ReentrantReadWriteLock、StampedLock)
  • 是否共享资源:排他锁(synchronized、ReentrantLock)、共享锁(Semaphore)、读写锁(ReentrantReadWriteLock、StampedLock)
  • 是否阻塞线程:同步锁(synchronized、ReentrantLock、ReentrantReadWriteLock、StampedLock)、自旋锁(SpinLock)

ps:ReentrantReadWriteLock是 读写互斥 的,而StampedLock是 读写不互斥

锁的范围分类

  • 线程锁
  • 进程锁
  • 分布式锁

ps:java锁、redis锁、数据库锁

锁的原理

  • 悲观锁:Mutex + 阻塞 + 唤醒
  • 乐观锁:CAS + 自旋 + 重试

锁的特性

  • 是否支持重入:同一个线程是否可以多次获取锁
  • 是否支持中断:线程是否可以响应中断请求
  • 是否支持公平:线程是否能够公平的处理请求

ps:公平(排队且先进先出)、非公平(先插队,如果失败后再排队)

锁的问题

DeadLock(死锁)

死锁的形成条件

  • 互斥使用:即当资源被一个线程占用时,别的线程不能使用
  • 不可抢占:资源请求者不能强制从资源占有者手中抢夺资源,资源只能由占有者主动释放
  • 资源保持:当资源请求者在请求其他资源的同时保持对现有资源的占有
  • 循环等待:多个线程存在环路的锁依赖关系而永远等待下去(例如T1占有T2需要的资源,T2占有T3需要的资源,T3占有T1需要的资源,这种情况可能会形成一个等待环路)

死锁的解决办法

  • 预防死锁
    • 破坏不可抢占条件:当请求不到其他资源超时时释放自己持有的资源
    • 破坏资源保持条件:申请资源时一次性申请全部所需的资源
    • 破坏循环等待条件:给资源分配编号并按照编号顺序进行申请
  • 避免死锁
    • 银行家算法
  • 检测和解除死锁

Starvation(饥饿)

饥饿的解决办法

  • 分配资源时使用公平的算法

ps:公平(排队且先进先出)、非公平(先插队,如果失败后再排队)

Usage

synchronized

  • 修饰类和静态方法,锁是当前类的Class对象
  • 修饰实例方法,锁是当前实例对象
  • 修饰代码块,锁是括号里面的对象

synchronized锁原理

  • 锁方法:通过方法的ACC_SYNCHRONIZED标识实现
  • 锁代码块:通过对象的monitor锁和系统的monitorentermonitorexit指令实现

synchronized锁优化

  • 锁膨胀
  • 锁消除
  • 锁粗化
  • 自适应自旋锁

synchronized锁膨胀

  • 无锁:没有线程获取锁
  • 偏向锁:只有一个线程获取锁时进入此状态(通过CAS对象的标记头获取锁)
  • 轻量级锁:多个线程获取锁时进入此状态(通过CAS对象的标记头获取锁,获取锁失败的线程需进行自旋)
  • 重量级锁:多个线程获取锁并且有线程自旋失败(10次且可配置)时进入此状态(通过操作系统的Mutex机制获取锁)

synchronized锁消除

synchronized锁粗化

synchronized锁自旋

ReentrantLock

ReentrantLock是基于AQS框架实现的,ArrayBlockingQueue则是基于ReentrantLock和Condition实现的

lock.lock() 写在 try 代码块内部行吗?

不能,如果写在try里面,当lock异常时,finally会执行unlock,unlock的时候检测到线程没有先持有锁会抛出 IllegalMonitorStateException 异常

如何安全的unlock

  • 用try-catch包住异常,并且不处理任何异常,不打印日志
  • 可以用ReentrantLock对象的isHeldByCurrentThread方法进行判断

ps:并不是所有的Lock实现类都有isHeldByCurrentThread方法,所以可以统一使用try-catch包住异常

ReentrantReadWriteLock

  • ReentrantReadWriteLock(悲观读写锁):写写互斥,读写互斥,读读不互斥
  • StampedLock(乐观读写锁):写写互斥,读写不互斥,读读不互斥

悲观读写读写互斥乐观读写读写不互斥

Atomic

  • 基本数据类型
    • AtomicInteger
    • AtomicLong
    • AtomicBoolean
  • 对象引用类型
    • AtomicReference
    • AtomicStampedReference
    • AtomicMarkableReference
  • 数组容器类型
    • AtomicIntegerArray
    • AtomicLongArray
    • AtomicReferenceArray
  • 计数累加器
    • LongAdder:java8新增,比AtomicInteger的性能更好
    • LongAccumulator:java8新增,通用版LongAdder
    • DoubleAdder
    • DoubleAccumulator
  • 字段更新器
    • AtomicIntegerFieldUpdater
    • AtomicLongFieldUpdater
    • AtomicReferenceFieldUpdater

ps:atomic依赖volatile并使用了cas保证原子性

CAS

CAS存在的问题

  • ABA问题:可以加版本号或者时间戳解决
  • 只能保证单个变量操作的原子性:可以合并多个变量为单个对象进行操作
  • 竞争激烈时大量的失败导致cpu做了很多无用功从而占用和浪费cpu资源:可以限制重试次数

ps:cas需要配合volatile来实现线程安全,atomic类就是这样实现的

OptimisticLock(乐观锁)

乐观锁的优点和缺点

  • 优点:避免冲突时等待造成的耗时
  • 缺点:失败后需要重新处理并重试

SpinLock(自旋锁)

自旋锁的优点和缺点

  • 优点:避免上下文切换带来的耗时
  • 缺点:自旋循环时间长的话会占用和浪费cpu资源

AQS

抽象同步器负责通用的逻辑(阻塞和唤醒、入队和出队),具体同步器负责自定义逻辑(加锁和解锁)

AQS的核心对象

  • state:资源
  • CLH队列:双向链表实现的等待队列(链表结点中包含线程对象)

ps:CLH队列中的某个结点会自旋CAS检查前驱结点的locked状态,自旋失败后则进行阻塞并等待前驱结点唤醒

AQS的重写方法

  • isHeldExclusively():该线程是否正在独占资源(只有用到Condition时才需要去实现它)
  • tryAcquire(int):独占方式,成功则返回true,失败则返回false
  • tryRelease(int):独占方式,成功则返回true,失败则返回false
  • tryAcquireShared(int):共享方式,负数表示失败,0表示成功但没有剩余资源,正数表示成功且有剩余资源
  • tryReleaseShared(int):共享方式,如果释放后允许唤醒后续等待对象则返回true,否则返回false

LockSupport

LockSupport.park可以实现 阻塞 功能但不能实现 互斥 功能

Question

synchronized和volatile的区别

  • synchronized可以修饰类、字段、方法,volatile只能修饰字段
  • synchronized保证原子性、可见性、有序性,volatile只保证可见性、有序性
  • synchronized会阻塞线程,volatile不会阻塞线程

synchronized和Lock的区别

  • 位置:
    • synchronized是java关键字,Lock是java类
  • 实现:
    • synchronized基于操作系统mutex机制实现,Lock基于java的AQS机制实现
  • 操作:
    • synchronized会自动释放锁,Lock需要手动释放
  • 超时:
    • synchronized不能设置等待超时时间,Lock可以设置等待超时时间
  • 状态:
    • synchronized无法判断是否获取了锁,Lock可以判断是否获取了锁
  • 特性:
    • synchronized支持重入,Lock也支持重入
    • synchronized不支持中断,Lock可支持中断(也支持不可中断)
    • synchronized不支持公平锁,Lock可支持公平锁(也支持非公平)

synchronized和ReentrantLock为什么默认是非公平的

因为非公平锁在释放后可以省去唤醒某个线程的开销直接让另一个线程获得锁,从而提高整体的效率

ps:但是非公平锁可能会导致饥饿问题

ReentrantLock是如何实现公平和非公平性的

  • 公平:线程在竞争锁资源的时候先判断AQS同步队列里面有没有在等待的线程,如果有的话就加入到队列的尾部等待,没有的话就直接获取锁
  • 非公平:线程在竞争锁资源的时候先尝试获取锁,失败后再加入到队列的尾部等待

synchronized是如何保证线程安全的

synchronized通过锁的互斥机制保证了原子性,使得同一时间只有一个线程能够操作资源来保证了线程安全,同时通过内存屏障来保证了线程安全中的可见性和有序性

synchronized的底层原理

  • 锁方法:通过方法的ACC_SYNCHRONIZED标识实现
  • 锁代码块:通过对象的monitor锁和系统的monitorentermonitorexit指令实现

wait和notify为什么要位于synchronized代码块中

  • wait和notify是用来实现线程间通信的,是基于共享变量实现的
  • 为了保证共享变量的线程安全,需要用synchronized来对共享变量加锁

Task

Basic

任务的运行方式

  • 串行:一个工作者按序执行多个任务(serial)
  • 并发:一个工作者分时执行多个任务(concurrent)
  • 并行:多个工作者同时执行多个任务(parallel)

ps:工作者(worker)通常是指线程(本质是指cpu)
ps:并发度指的就是 分时 执行任务的任务个数
ps:并行度指的就是 同时 执行任务的任务个数

串行的实现方式

  • 加锁
  • 队列

ps:加锁的本质还是使用队列进行排队

并发的实现方式

  • 多线程
  • 多进程
  • 协程

并发和并行的区别

  • 并发(Concurrency):一个工作者分时执行多个任务
  • 并行(Parallelism):多个工作者同时执行多个任务

集群和分布式的区别

  • 集群(Cluster):多个工作者轮流处理主任务
  • 分布式(Distributed):多个工作者分工处理子任务

同步和异步的区别

  • 同步(Synchronization):主动获取结果,不一定阻塞的(轮询获取结果就不是阻塞的)
  • 异步(Asynchronization):被动获取结果,一般是非阻塞

任务的调度方式

普通调度(normal)

可以使用ScheduledThreadPoolExecutor的 schedule 方法实现

延时调度(delay)

可以使用ScheduledThreadPoolExecutor的 schedule 方法中的 delay 参数实现

定时调度(timing)

可以使用ScheduledThreadPoolExecutor的如下方法实现

  • scheduleAtFixedRate:固定频率的触发任务的执行
  • scheduleWithFixedDelay:固定延迟后触发任务的执行

Usage

任务的分类

  • 常规任务(NormalTask)
    • 同步任务(SyncTask)
    • 异步任务(AsyncTask)
  • 调度任务
    • 延时任务(DelayedTask)
    • 定时任务(TimedTask)

任务的核心

  • Future:普通任务的接口,核心实现类为FutureTask
  • RunnableFuture:对Runnable进行包装的Future
  • FutureTask:Future的实现类
  • ScheduledFuture:调度任务的接口,核心实现类为ScheduledFutureTask
  • RunnableScheduledFuture:对Runnable进行包装的ScheduledFuture
  • ScheduledFutureTask:ScheduledFuture的实现类
  • Delayed:延迟任务的接口,继承了Comparable
  • ExecutorService:普通任务执行器,核心实现类为ThreadPoolExecutor
  • ScheduledExecutorService:调度任务执行器,核心实现类为ScheduledThreadPoolExecutor
  • CompletionStage:异步任务调度和计算的接口
  • CompletableFuture:异步任务调度和计算的实现类

NormalTask

SyncTask

java中没有SyncTask类,对应的类为FutureTask

Future
FutureTask
Basic
State
  • NEW(已创建):创建FutureTask之后
  • COMPLETING(完成中):设置结果开始时,是一个中间过渡态
  • NORMAL(已正常完成):设置结果(正常完成)结束后
  • EXCEPTIONAL(已异常完成):设置结果(异常完成)结束后
  • CANCELLED(已取消):取消任务后
  • INTERRUPTING(中断中):中断开始时,是一个中间过渡态
  • INTERRUPTED(已断中):中断结束后
Exception

FutureTask.get异常如下

  • CancellationException:任务提交后取消抛出的异常
  • InterruptedException:任务执行时中断抛出的异常
  • ExecutionException:任务执行时出错抛出的异常
  • TimeoutException:任务执行时超时抛出的异常

ps:cancel时如果任务还在执行中,就会抛出CancellationException(RuntimeException),否则就可能抛出InterruptedException

Cancel

ps:cancel时如果任务已经执行,会调用 Thread.interrupt当前 线程进行中断

Timeout

ps:get的阻塞效果是通过LockSupport.park或者LockSupport.parkNanos实现的

AsyncTask

java中没有AsyncTask类,对应的类为CompletableFuture

异步和线程的关系

  • 异步既可以用单线程(事件循环)来实现
  • 异步也可以用多线程(子线程)来实现

异步和并发的区别

  • 异步不等同于并发
  • 异步是实现并发的方式之一
  • 多线程也是实现并发的方式之一

异步编程之实现方式

  • 单线程
  • 多线程
  • 协程

异步编程之实现风格

  • 回调(Callback):会导致地狱回调(callback hell)
  • 期望(Future/Promise):支持链式调用从而避免了地狱回调的问题
  • 协程(Coroutine):支持像调用同步代码一样调用异步代码(需要编程语言支持)

异步编程之获取结果

  • 轮询
  • 回调
CompletionStage
CompletableFuture
Basic
  • 包含async方法和非async方法
  • 包含async的有两种方法,一种使用默认的forkjoin线程池,一种是使用自定义线程池
  • 包含apply的都使用Function回调,有输入,有返回值
  • 包含accept的都使用Consumer回调,有输入,没有返回
  • 包含supply的都使用Supplier回调,没有输入,有返回
  • 包含run的都使用Runnable回调,没有输入,没有返回

####### 创建任务(createTask)

######## supplyAsync

1
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

######## runAsync

1
public static CompletableFuture<Void> runAsync(Runnable runnable)

######## completedFuture

1
public static <U> CompletableFuture<U> completedFuture(U value)

####### 处理结果(handleResult)

result = value or error

######## thenApply(thenApplyValue)

1
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

######## thenAccept(thenAcceptValue)

1
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

######## thenRun(thenRun)

1
public CompletableFuture<Void> thenRun(Runnable action)

######## exceptionally(thenApplyError)

1
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

######## handle(thenApplyResult)

1
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)

######## whenComplete(thenAcceptResult)

1
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  • thenRun不关心值,也不关心异常(??? 会吃掉异常,支持从异常中恢复并继续后面的操作)
  • thenApply和thenAccept只关心值,不关心异常(??? 会吃掉异常,支持从异常中恢复并继续后面的操作)
  • exceptionally不关心值,只关心异常(会吃掉异常,支持从异常中恢复并继续后面的操作)
  • handle既关心值,也关心异常(会吃掉异常,支持从异常中恢复并继续后面的操作)
  • whenComplete既关心值,也关心异常(不会吃掉异常,不支持从异常中恢复并中断后面的操作)

####### 设置结果(setResult)

######## 设置结果(setValue)

1
public boolean complete(T value)
1
public void obtrudeValue(T value)

######## 设置结果(setError)

1
public boolean completeExceptionally(Throwable ex)
1
public void obtrudeException(Throwable ex)

####### 获取结果(getResult)

1
public T get() throws InterruptedException, ExecutionException
1
public T join()
1
public T getNow(T valueIfAbsent)

####### 检查状态(checkState)

1
public boolean isDone()
1
public boolean isCompletedExceptionally()

####### 编排任务(Choreography)

######## 链式调用(chain)

######### thenCompose(thenChain)

将当前异步任务的结果交给另一个异步任务处理,并返回另一个异步任务,即串联执行两个任务

1
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

######## 组合调用(group)

组合处理两个或多个任务

######### 组合调用(and)

解决任务之间的 编排(choreography)关系,等待两个任务同时完成,并处理两个任务的结果

########## thenCombine(thenApplyBoth)

1
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

########## thenAcceptBoth(thenAcceptBoth)

1
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)

########## runAfterBoth(thenRunBoth)

1
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

######### 组合调用(or)

解决任务之间的 编排(choreography)关系,等待任意一个任务完成,并处理快的任务的结果

########## applyToEither(thenApplyEither)

1
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)

########## acceptEither(thenAcceptEither)

1
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

########## runAfterEither(thenRunEither)

1
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)

######### 组合调用(all)

解决任务之间的 编排(choreography)关系,等待所有任务同时完成,并忽略所有任务的结果

1
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

######### 组合调用(any)

解决任务之间的 编排(choreography)关系,等待任意一个任务完成,并返回快的任务的结果

1
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
State
Exception
Cancel
Timeout
More
AsyncTask(python)

####### async和await(python)

####### coroutine(python)

####### asyncio(python)

####### aiohttp(python)

####### Tornado(python)

####### Celery(python)

celery backend:结果存储
celery broker:通信代理

AsyncTask(javascript)

####### async和await(javascript)

####### Promise(javascript)

ScheduledTask

DelayedTask

延时任务的实现方案

  • 线程方案
    • Thread.sleep:会导致线程暂停,效率低,不推荐
  • 线程池方案
    • ScheduledExecutorService.schedule(Runnable command, long delay, TimeUnit unit)
  • 队列方案
    • PriorityBlockingQueue + 轮询:效率低,不推荐
    • DelayQueue
    • MQ延时消息

ps:推荐使用MQ延时消息,因为MQ延时消息支持任务持久化,其他方案不支持

TimedTask

ScheduledFuture
ScheduledFutureTask

Question

FutureTask为什么支持传递给线程

因为FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口,所以FutureTask支持传递给线程

Executor

Basic

线程池的创建方式

  • 使用Executors快速创建
  • 使用ExecutorService手动创建

线程池的核心参数

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:线程保活时间大小
  • unit:线程保活时间单位
  • workQueue:工作队列
  • threadFactory:线程工厂
  • handler:任务饱和策略处理器

线程池的提交策略

  • poolSize < corePoolSize时增加线程
  • poolSize = corePoolSize时放入队列
  • 队列满了之后再增加线程
  • 线程达到maximumPoolSize后执行RejectedExecutionHandler
  • 根据RejectedExecutionHandler指定的拒绝策略来处理新的任务

线程池的线程数量

  • 如果是CPU密集型应用,则线程池大小设置为CPU核心数+1
  • 如果是IO密集型应用,则线程池大小设置为2*CPU核心数+1

线程池的工作队列

  • BlockingQueue
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • SynchronousQueue
    • LinkedTransferQueue
    • PriorityBlockingQueue
    • DelayQueue
    • DelayedWorkQueue
  • BlockingDeque
    • LinkedBlockingDeque

线程池的饱和策略

  • AbortPolicy:丢弃任务并且抛出RejectedExecutionException异常(默认策略)
  • DiscardPolicy:丢弃任务但不抛出异常
  • DiscardOldestPolicy:丢弃最老的(队列头部)任务
  • CallerRunsPolicy:在提交的线程中直接执行任务

线程池的提交方式

  • 使用submit方法提交(普通任务有返回值时使用)
  • 使用execute方法提交(普通任务无返回值时使用)
  • 使用schedule方法提交(延时任务和定时任务时使用)

submit和execute的区别

  • execute只支持Runnable,submit可以支持Runnable和Callable
  • execute没有返回值,submit有返回值
  • execute有异常会直接打印,submit在Future.get的时候才会打印异常

线程池的异常捕获

  • execute方式提交时
    • 在方法内部捕获异常并处理
    • 在ThreadFactory里使用Thread.setUncaughtExceptionHandler拦截处理
    • 在ThreadPoolExecutor.afterExecute里面处理
  • submit方式提交时
    • 在方法内部捕获异常并处理
    • 调用Future.get的时候捕获异常并处理

ps:线程池里面的某个线程异常了,线程池会移除这个线程并创建一个新的线程

线程池的关闭方式

  • shutdown:线程池的状态变为SHUTDOWN,不再接受新任务了,不会终止当前正在运行的任务,还会继续处理队列里剩余的任务
  • shutdownNow:线程池的状态变为STOP,不再接受新任务了,会终止当前正在运行的任务,而且不会处理队列里剩余的任务并返回还未处理完成的任务列表

shutdown和shutdownNow的区别

  • shutdown和shutdownNow之后都不会接收新任务了
  • shutdown不会终止当前正在运行的任务,shutdownNow会终止当前正在运行的任务
  • shutdown还会继续处理队列里剩余的任务,shutdownNow不会处理队列里剩余的任务

Usage

Executors

快速创建线程池的方法

快速创建线程池的本质是ThreadPoolExecutor和ScheduledThreadPoolExecutor的快速构造

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Executors.newSingleThreadExecutor
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

线程数固定(一个),队列容量无限大,适合需要串行执行的任务

Executors.newFixedThreadPool
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

线程数固定(多个),队列容量无限大,线程数量固定且不回收线程,任务需要排队,适合耗时长的少量任务

Executors.newCachedThreadPool
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

线程数无限大,队列容量固定(0个),线程数量充足且有保活时间,任务不用排队,适合耗时短的大量任务

Executors.newScheduledThreadPool
1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

线程数无限大,队列容量无限大,支持延时和定时任务

Executors.newSingleThreadScheduledExecutor
1
2
3
4
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

线程数无限大,队列容量无限大,支持延时和定时任务

Executors.newWorkStealingPool
1
2
3
4
5
6
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
1
2
3
4
5
6
7
8
9
10
11
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
1
2
3
4
5
6
7
8
9
10
11
12
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

快速创建线程池的问题

  • Executors.newSingleThreadExecutor:队列容量无限大,容易导致内存耗尽引发OOM
  • Executors.newFixedThreadPool:队列容量无限大,容易导致内存耗尽引发OOM
  • Executors.newCachedThreadPool:线程数量无限大,容易导致内存耗尽引发OOM
  • Executors.newScheduledThreadPool:线程数量无限大,容易导致内存耗尽引发OOM
  • Executors.newSingleThreadScheduledExecutor:线程数量无限大,容易导致内存耗尽引发OOM
  • Executors.newWorkStealingPool:队列容量无限大,容易导致内存耗尽引发OOM

ExecutorService

ThreadPoolExecutor

State

RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

  • RUNNING(运行):创建线程池后的初始状态
  • SHUTDOWN(关闭):执行shutdown()后,不接收新任务,但会继续处理当前任务和队列里剩余的任务
  • STOP(停止):执行shutdownNow()后,不接收新任务,且会中断当前任务和丢弃并返回队列里剩余的任务
  • TIDYING(清扫):线程池中的任务队列为空后
  • TERMINATED(终结):执行terminated()后

ScheduledExecutorService

ScheduledThreadPoolExecutor

CompletionService

批量提交任务并获取结果的实现方案

  • 保持顺序:将提交任务返回的Future添加到列表中,并等任务全部提交后,再循环列表获取Future的结果
  • 速度优先:CompletionService

ExecutorCompletionService

ForkJoinPool

Timer

定时器实现底层机制

  • 硬件定时器
  • 软件定时器

定时器实现数据结构

  • 最小堆(参考java的DelayQueue)
  • 红黑树(参考java的SortedSet)
  • 跳跃表(参考redis的SortedSet)
  • 时间轮(参考kafka的TimingWheel)

Question

线程池是如何保证核心线程不被销毁的

线程池里面的线程内部是无限循环,任务执行完后不会结束,而是继续去队列里面获取任务,如果没获取到任务,就会被队列阻塞直到有新的任务可以获取

如何知道线程池里的的任务已经完成了

  • 线程池内部
    • Runnable的run方法执行完毕时,这个任务就完成了
    • Callable的call方法执行完毕时,这个任务就完成了
  • 线程池外部
    • 通过线程池的isTerminated方法可以用来判断所有的任务是否已经完成了
    • 通过线程池返回的Future对象的isDone方法可以用来判断某个任务是否已经完成了
    • 可以通过CountDownLatch计数器来等待相关的任务完成

Cooperation

Communication

线程间通信(Communication)是线程间协作(Collaboration)的基础

  • volatile:volatile和轮询
  • Object:wait和notify、notifyAll
  • Condition:await和signal、signalAll
  • LockSupport:park和unpark
  • CountDownLatch:await和countDown

线程通信机制详解

  • Object:wait和notify、notifyAll
  • Condition:await和signal、signalAll
  • LockSupport:park和unpark

ps:Thread.join是在Thread对象上手动wait和自动notify的线程通信快捷方式

Object的wait和notify

Condition的await和signal

LockSupport的park和unpark

线程通信机制比较

Object只有一个等待队列,Condition可以有多个等待队列

Collaboration

  • Mutex:解决互斥的问题
  • Lock:解决同步的问题
  • Semaphore:解决限流的问题(限流器)
  • CountDownLatch:解决计数的问题(计数器)
  • CyclicBarrier:解决同步点的问题(屏障器)

Mutex

Semaphore

Semaphore和Mutex的区别

Semaphore:同时可以有多个使用者访问资源(acquire和release)
Mutex:同时只能有一个使用者访问资源(lock和unlock)

Semaphore和Lock的区别

Semaphore:释放许可前不要求先获取许可,并且任何人都能释放许可
Lock:解锁前必须先持有这个锁,且只能自己释放锁

CountDownLatch

CountDownLatch和Semaphore的区别

  • CountDownLatch:计数器,只能递减数量(count),归零后通知等待的线程继续运行
  • Semaphore:信号量,可以释放许可(permit),归还许可给其他线程使用

CyclicBarrier

CyclicBarrier和CountDownLatch的区别

  • CyclicBarrier:屏障栏,可以重复使用,归零后通知等待的所有线程继续运行
  • CountDownLatch:计数器,只能使用一次,归零后通知等待的监听线程继续运行

Exchanger

Other

Utils

Java

Apache Commons

Google Guava

只想买包辣条