JUC

JUC

线程

线程创建的四种方式:

  • 继承Thread,重写run方法。
  • 实现Runnable接口,重写run方法
  • 实现Callable接口,重写call方法
    • 与Runnable方法基本相同,区别是有一个返回值
    • 返回值是Future对象,可以异步获取执行结果
    • 必须搭配线程池使用
  • 线程池获取

线程创建的设计模式

静态代理模式:

  • Thread自己实现了Runnable接口,相当于代理类
  • 自己的类实现了Runnable接口,调用时需要new Thread(),参数传入我们自己的类,算是给代理类传真实对象
  • 然后代理类Thread帮我们执行相关代码

线程的状态

Java线程有六个状态

  • NEW:线程被new就进入此状态
  • RUNNABLE:调用start()方法进入此状态
  • WAITING:调用了wait(), 调用了join(),调用了LockSupport.park方法
  • TIMED_WAITING:
    • wait(long)
    • sleep(long)
    • join(long)
    • LockSupport.parkNanos(long)
    • LockSupport.parkUntil(long)
  • BLOCKED:获取synchorized锁对象时,拿不到锁对象
  • TERMINATED:
    • 执行完成
    • 调用了stop()方法,不推荐使用

从WAITING或TIMED_WAITING唤醒

  • notify()
  • notifyAll()
  • LockSupport.unpark()

RUNNABLE状态

  • 在OS的进程,有就绪态和运行态,在Java中把这两个状态合二为一了,因为JVM设计者认为,关于线程的调度,属于CPU管理的,与Java无关
  • 处于RUNABLE状态的线程调用yield()方法可以进入Ready状态

线程池

线程池的作用

  • 方便对线程进行管理
  • 减小线程切换的开销
  • 加快响应速度
  • 控制并发量
  • 方便复用线程

ThreadPoolExecutor

构造方法

参数:

  • 核心线程数corePoolSize
  • 最大线程数 maximumPoolSize(非核心线程数就是两者差maximumPoolSize -corePoolSize)
  • 线程保活时间keepAliveTime
  • 时间单位unit
  • 阻塞队列workQueue
    • LinkedBlockingQueue FIFO执行,没有界限大小
    • ArrayBlockingQueue 有界限大小
    • DelayWorkQueue
    • SynchronousQueue
  • 线程工厂threadFactory
  • 拒绝策略Handler
    • AbortPolicy默认的拒绝策略[直接抛出异常]
    • CallerRunsPolicy将后续的任务交给其调用者执行[并没有拒绝]
    • DiscardPolicy将后来的任务默默丢弃
    • DiscardOldPolicy将最老的任务丢弃
  • 线程池工作原理
    1. 创建线程池(此时池内没有线程)
    2. 接到任务,立刻创建线程
    3. 继续接收任务,当线程池数量达到了核心线程数量,就放入阻塞队列
    4. 任务继续增加,阻塞队列满了,创建线程
    5. 如果任务继续增加,达到了最大线程数量,此时根据不同的拒绝策略进行拒绝
    6. 线程闲下来时,如果此时的线程数量大于核心线程数,那么多于核心线程数的线程会被销毁
如何执行任务
  • execute(Runnable) 执行run方法,没有返回值
  • submit 有三种参数,都会返回一个Future接口对象
    • Runnable接口
    • Callable接口
    • Runnable接口,T result 将结果放在result内
内部核心类Worker
  • 线程池内部的Worker类,继承了AQS,实现了Runnable
  • 线程执行Worker,Worker不断从阻塞队列里获取任务来执行
==终止线程的四种方式==
  • 正常结束
  • 使用标志符号volatile boolean exit 循环判断exit状态
  • interrupt
    • 线程未阻塞状态 isInterrupted()判断,如果为true就中断
    • 线程阻塞状态捕获Interrupt异常,break退出
  • stop() 不推荐
非核心线程延迟死亡是如何实现的?
  • 通过阻塞队列poll(),让线程等待一段时间,如果没有取到任务,则线程死亡
核心线程为什么不死?
  • 通过阻塞队列take(),让线程一直等待,直到获取到任务
  • 可以使用allowCoreThreadTimeOut(true)让线程死亡

Java已经实现的四种线程池

通过ExecutorService调用(不推荐使用)

newSingleThreadExceutor
  • 核心线程数和最大线程数都为1
  • 阻塞队列为LinkedBlockingQueue
newScheduledThreadPool
  • 核心线程数自己指定
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列为一个DelayWorkQueue
  • 返回一个ExcutorService的子类ScheduledExcutorService 调用scheduleAtFixedRate(),可以定时执行传入三个参数:初始延迟时间,执行周期,时间单位
newCachedThreadPool
  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 保活时间为60s
  • 阻塞队列是SynchronousQueue
newFixThreadPool
  • 核心线程数和最大线程数一样,是一个值
  • LinkedBlockingQueue

为什么不推荐使用(且阿里手册不推荐使用Executors)?

  • FixedThreadPool和SingleThreadExecutor中的阻塞队列是无界的,如果没有适当的管理任务提交的速率,可能会导致内存溢出
  • CachedThreadPool可以创建的线程数量是Integer.MAX_VALUE,会导致创建太多线程,导致系统资源耗尽,如操作系统能打开的文件描述符数量限制
  • ScheduledThreadPool通常用于延迟执行或定期执行任务,如果不正确关闭,可能会导致内存泄漏(内存泄漏的可能原因之一)
  • Executors没有提供良好的关闭机制,直接使用shutdown()或shutdownNow()可能会导致正在执行的任务被中断或者丢失
  • 参数都是定的,不灵活

线程池的状态

成员变量AtomicInteger ctl 用来存储当前的线程数量及线程池状态

高3位—线程池五种状态之一
  • RUNNING -1 [创建后处于的状态]
  • SHUTDOWN 0 [线程池调用SHUTDOWN进入的状态不会接受新任务,但是会将旧任务继续执行完成]
  • STOP 1 [调用shutDownNow进入STOP状态,线程池不能接收新的任务,阻塞队列中的任务也会被丢弃]
  • TIDYING 2 [所有任务终止, ctl记录任务数量为0就会变为这个状态]
  • TERMINATED 3 [执行了terminated方法,进入此状态]
底29位

存储当前线程池数量


锁的分类

乐观锁与悲观锁

乐观锁
  • 认为读多写少,不会上锁
  • CAS就是乐观锁
悲观锁
  • 认为写多,不管是读还是写都会上锁,阻止其他线程
  • synchronized就是悲观锁

公平锁与非公平锁

公平锁

不允许插队,只能老老实实排队等待执行

非公平锁

可以插队,如果线程进入的同时,发现当前任务刚好处于切换状态,那么就插队,优先执行

重入锁

一个线程得到锁后,执行线程自己本身的方法,不需要再去获得锁

共享锁与独占锁

独占锁
  • 写锁
  • 悲观策略,无论读还是写都会上锁
  • 只能有一个线程占有
共享锁
  • 读锁
  • 可以有很多个线程获取共享锁
  • 共享锁期间,不期望有写操作,如果真的有写操作,需要升级为独占锁

核心锁

CAS

(属于乐观锁、自旋锁)

比较并设置
  • 三个参数:期望值、旧值、新值
  • 如果期望值等于旧值,就把新值赋值
存在的问题
  • ABA问题:CAS比较的空档期,有一个线程更改了值,但是最后又改了回来,导致CAS并没有发现其他线程变更过此值
    • 解决:设置一个版本号或者时间戳,比较这个版本号或时间戳
    • Java的解决方式:用AtomicStampedReference 版本号
  • 循环时间长,持续占有CPU资源(因为自旋)
  • 只能保证一个变量的原子性操作问题

synchorized

(属于悲观锁、可重入锁、非公平锁)

修饰方法
  • 修饰静态方法:锁的是class对象,相当于全局锁
  • 修饰动态方法:锁的是实例对象,即this
修饰代码块
  • 锁住传入的对象
锁升级的过程

在JDK1.6之前全为重量级锁,在JDK1.6引入了锁升级过程

四种锁状态
  • 无锁
  • 偏向锁
  • 轻量级锁
  • 重量级锁
锁状态在对象头中存放
  • 无锁与偏向锁都为01,偏向锁有专门的一位标识
  • 轻量级锁00
  • 重量级锁10
对象的结构
  • 对象头12字节
    • markword 8字节
      • hashCode
      • GC信息 对象分代信息
      • 锁信息
    • classpoint 4 字节
  • 实例数据
  • 对齐填充
锁升级过程
  • 一开始为无锁状态
  • 当线程要去获取锁,将锁给他,并将锁的状态切换为偏向锁,并且将锁的状态切换为偏向锁,并且将该线程的ID记录在对象头中
  • 当线程又要锁时,判断是否为当前线程ID,是就允许进入,不是就进行一次CAS判断,替换当前线程ID操作成功就允许进入,不成功就升级为重量级锁
  • 轻量级锁能够进行多次CAS操作和自旋判断,如果还是不能满足当前的竞争状况就会升级为重量级锁
  • 重量级锁的实现是由OS的MuteX实现的

Lock

Lock的提出是为了解决synchorized锁还是太重的问题,但是一个悲观锁,锁住的时候,其他线程甚至不能进行读操作

可实现公平、非公平、可重入、排它锁(默认)

API
  • lock() 若处于空闲状态获取到锁
  • unLock 释放锁
  • tryLock() 如果获取不到锁,不会一直等待,会继续向下执行代码
  • newCondition() 创建一个Condition对象
  • lockInterruptibly() 如果线程为了取锁而进入了等待状态,此时可以使用Interrupt中断其等待状态
ReentrantLock

是Lock接口的一个实现

  • 可重入锁、默认为非安全锁(可重入锁的非安全版本)
  • 默认是一个写锁
  • 构造是可以定是否为安全锁(即是否公平)
  • ReentrantLock相对于synchorized
    • 可以创建为公平锁
    • 可以创建Condition对象,绑定多个条件
    • 实现了等待可中断

ReadWriteLock

ReentrantLockReadWriteLock

将读操作和写操作分离

遵从四个原则
  • 允许多个线程读
  • 只允许一个线程写
  • 读的时候不许写
  • 写的时候不许读
API
  • readLock() 获取读锁
  • writeLock() 获取写锁
  • lock()
  • unlock()
  • newCondition() 只有写锁可以生成条件
锁的升级降级
  • 不支持升级 指从读锁变为写锁
  • 支持降级 从写锁变为读锁
源码分析

ReentrantLock的实现依赖于内部的Sync类,继承自AQS

两种锁策略

  1. 公平锁
    按照线程请求锁的顺序来分配锁
  2. 非公平锁(默认)
    使线程抢占到锁

加锁和解锁操作是通过调用AQS的acquire和release方法实现的
注意

  • 在使用 ReentrantLock 时,需要注意正确管理锁的获取和释放,以避免死锁和性能问题。
    • 应该总是在 finally 块中调用 unlock() 方法,以确保锁能够在所有情况下都被正确释放。此外,ReentrantLock 还提供了 tryLock()tryLock(long timeout, TimeUnit unit)lockInterruptibly() 等方法,以支持可中断的锁获取操作和带超时的锁获取操作
比较ReentrantLock和Synchronized
比较方面 Synchronized ReentrantLock(实现了lock接口)
原始构成 它是java语言的关键字,是原生语法层
面的互斥,需要ivm实现
它是JDK 1.5之后提供的API层面的互斥锁类
实现 通过JVM加锁解锁 api层面的加锁解锁,需要手动释放锁
代码编写 采用synchronized不需要用户去手动释
放锁,当synchronized方法或者
synchronized代码块执行完之后,系统
会自动让线程释放对锁的占用,更安
而ReentrantLock则必须要用户去手动释放锁,如果没有主动释放锁,就有
可能导致出现死锁现象。需要lock()和unlock0方法配合tny/finally语句块来完
灵活性 锁的范围是整个方法或synchronized块部分 Lock因为是方法调用,可以跨方法,灵活性更大

volatile

修饰变量的

可见性

  • 即一个线程对变量的修改,其他线程是可以看到变化的
实现原理
  • 每次强制每一个线程对共享变量的读写写回主存
  • 缓存一致性协议,对被修饰变量的修改会发出信号通知其他线程去主存中读

有序性

  • 可以保证对该变量的操作保持原来的顺序,不会被重排序
重排序
  • 编译器重排序
  • 内存重排序
  • CPU流水线重排序
实现原理 happens before 原则 + cpu内存屏障指令

只能保证原子操作的变化

AtomicInteger

是 Java 中的一个类,属于 java.util.concurrent.atomic 包。它提供了对基本整数类型 int 的原子操作。这意味着你可以在多线程环境中安全地执行对 int 类型的变量的增加、减少、设置和更新等操作,而不需要担心线程间的冲突

  • 使用一个volatile记录值
  • 更改值时使用while进行cas判断

活跃性问题

死锁

死锁四个必要条件

  • 互斥性:一次只能有一个线程使用资源
  • 请求和保持条件:即一个线程获取到资源,还需要其他资源
  • 不可剥夺条件:线程的资源不可以被抢夺
  • 环路等待条件:形成环路,会造成死锁

如果解决死锁

  • 互斥性是共享资源的本质,不可破坏
  • 破坏请求和保持:线程一次就获取任务所需的所有资源,一同获取,一同释放
  • 破坏不可剥夺:synchorized做不到,Lock可以做到
  • 破坏环路等待条件:给资源一个字段便于排序,两个相同的操作获取锁的顺序需要一致(但是限制了资源的增长)

活锁

线程之间相互谦让,导致谁也没有执行

解决活锁:可以设置一个随机的等待时间

饥饿

线程无法获取资源,无法运行

三个根源问题

  • 资源设置为无限 不可能
  • 避免持有锁的对象长时间执行 很难实现
  • 公平分配资源 公平锁

管程模型

也叫Moniter,用来处理并发问题的一种模型

OS使用信号量这种管程模型处理并发问题

Java中封装了synchronized来处理并发问题

Synchronized-wait-notify 模型

API

  • wati() 移入等待队列
  • notify() 唤醒一个等待队列中的数据
  • notifyAll() 唤醒所有线程

若非有特殊需要,使用notifyAll()。 因为notify()是随机唤醒一个线程,很有可能唤醒的这个线程不需要这个资源,白白唤醒


工具类

AQS

AQS同步抽象队列 重写 tryLock、tryRelease方法

结构特点

  • 维护了一个volatile修饰的state,关于state有三个方法

    • getState()
    • setState()
    • compareAndSetState()

    只能有一个线程抢到state资源进行执行,其他线程会在双向链表中等待

  • ReentrantLock的本质就是有一个Sync类,这个类继承了AQS

  • 维护了一个双端链表

实现的逻辑

  • 继承AbstractQueuedSynchorizer抽象类
  • 实现tryAcquire() 方法 尝试获取锁,与lock不同,该方法不会使当前线程阻塞,锁不可用立即返回false
  • 实现tryRelease() 方法 尝试释放锁 与unlock不同,同上
  • 实现 isHeldExdclusively() 方法 可以知道当前执行的是否为自己的线程

实现类

  • ReentrantLock
  • 线程池的Worker
  • JDK1.7的ConcurrenthashMap的segment数组分段锁

源码分析

继承自AbstractOwnableSysnchronizer抽象类,并且实现了Serialable接口,可以进行序列化

两个内部类类
  1. Node类(分为共享模式和独占模式)
    其中每个被阻塞的线程都会被封装成一个Node节点,放入队列。每个节点包含了一个Thread类型的引用,每个节点都存在一个状态
    • CANCELLED,值为1,表示当前的线程被取消。
    • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作
    • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中
    • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。
    • 值为0,表示当前节点在sync queue中,等待着获取锁。
  2. ConditionObject类
类的属性

包含了头节点head,尾结点tail,状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址

核心方法 - acquire
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

以独占模式获取资源,忽略中断

image-20240812230642510

  • 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,
  • 如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。
  • 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
  • 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

addWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 添加等待者
private Node addWaiter(Node mode) {
// 新生成一个结点,默认为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 保存尾结点
Node pred = tail;
if (pred != null) { // 尾结点不为空,即已经被初始化
// 将node结点的prev域连接到尾结点
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node
// 设置尾结点的next域为node
pred.next = node;
return node; // 返回新生成的结点
}
}
enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
return node;
}

addWaiter方法使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中,enq方法源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node enq(final Node node) {
for (;;) { // 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) { // 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的结点
tail = head; // 头节点与尾结点都指向同一个新生结点
} else { // 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
}

acquireQueued方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// sync队列中的结点在独占且忽略中断的模式下获取(资源)
final boolean acquireQueued(final Node node, int arg) {
// 标志
boolean failed = true;
try {
// 中断标志
boolean interrupted = false;
for (;;) { // 无限循环
// 获取node节点的前驱结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驱为头节点并且成功获得锁
setHead(node); // 设置头节点
p.next = null; // help GC
failed = false; // 设置标志
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

  • 判断前驱节点是否为head,并且是否成功获取资源
  • 1满足,返回interrupt,表示没有被中断过,设置当前节点为head
  • 不满足,则判断是否需要park当前线程,如果前驱节点的状态为SIGNAL,park当前节点,否则不进行park操作

补充:
houldParkAfterFailedAcquire和parkAndCheckInterrupt方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 当获取(资源)失败后,检查并且更新结点状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 可以进行park操作
return true;
if (ws > 0) { // 表示状态为CANCELLED,为1
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
// 赋值pred结点的next域
pred.next = node;
} else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 比较并设置前驱结点的状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能进行park操作
return false;
}

// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {
// 在许可可用之前禁用当前线程,并且设置了blocker
LockSupport.park(this);
return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}

CountDownLatch

让一个线程等待其他线程执行完再执行

当前值不为0就一直阻塞

为0就允许所有线程通过

不用contDownLatch也可以用父子线程调用join实现

核心API

  • new CountDownLatch(int)
  • await() 进入等待状态
  • countDown() 将值减一

Exchanger

两个线程之间交换资源

支持泛型,参数可以传流

核心API

  • new Exchanger<>()

  • exchange()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class ExchangerExample {
    public static void main(String[] args) {
    Exchanger<String> exchanger = new Exchanger<>();

    //第一个线程
    new Thread(() -> {
    try {
    String data1 = "Thread1 Data";
    System.out.println("Thread1 has data to exchange: " + data1);
    Thread.sleep(2000); //模拟一些计算需要2秒
    String data2 = exchanger.exchange(data1);
    System.out.println("Thread1 received:" + data2);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    }).start();

    //第二个线程
    new Thread(() -> {
    try {
    String data2 = "Thread2 Data";
    System.out.println("Thread2 has data to exchange: " + data2);
    Thread.sleep(1000); // 模拟一些计算
    String data1 = exchanger.exchange(data2);
    System.out.println("Thread2 received: " + data1);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    }).start();
    }
    }

CyclicBarrier

解决CountDownLatch不可复用的问题
每当值变为0,自动重置

核心API

  • new CyclicBarrier(int, Runnable) 一个等待的值, 一个值变为0后要执行的方法

  • await() 计数器减一

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class CyclicBarrierExamples {
    public static void main(String[] args) {
    final int numThreads = 3;
    CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
    System.out.println("All threads have reached the barrier. Let's continue.");
    });

    for (int i = 0; i < numThreads; i++) {
    Thread thread = new Thread(new Worker(barrier));
    thread.start();
    }
    }

    static class Worker implements Runnable {
    private final CyclicBarrier barrier;

    Worker(CyclicBarrier barrier) {
    this.barrier = barrier;
    }

    @Override
    public void run() {
    try {
    System.out.println("Thread is doing some work.");
    Thread.sleep((long) (Math.random() * 3000));
    System.out.println("Thread has reached the barrier.");
    barrier.await(); // 等待其他线程到达栅栏
    System.out.println("Thread continues to do its work after the barrier.");
    } catch (InterruptedException | BrokenBarrierException e) {
    Thread.currentThread().interrupt();
    }
    }
    }
    }

Semaphore

Java 中的一个同步类,位于 java.util.concurrent 包中。它是一个计数信号量,用于控制同时访问特定资源的线程数量。Semaphore 通过维护一组许可证(permits)来实现同步,线程在访问资源之前必须先获得许可证,使用完毕后释放许可证。

核心API

  • new Semaphore(int) 可共享资源的数量

  • acquire() 资源数-1 如果为0 进入等待状态

  • release() 资源数+ 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class SemaphoreExample {
    public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(2); // 初始化一个许可证数量为2的Semaphore

    // 创建并启动5个线程
    for (int i = 0; i < 5; i++) {
    Thread thread = new Thread(new Task(semaphore, i));
    thread.start();
    }
    }

    static class Task implements Runnable {
    private final Semaphore semaphore;
    private final int taskId;

    Task(Semaphore semaphore, int taskId) {
    this.semaphore = semaphore;
    this.taskId = taskId;
    }

    @Override
    public void run() {
    try {
    System.out.println("Task " + taskId + " is trying to acquire a permit.");
    semaphore.acquire(); // 获取一个许可证,如果没有可用的许可证,线程将被阻塞
    System.out.println("Task " + taskId + " has acquired a permit.");
    // 模拟任务执行
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    } finally {
    System.out.println("Task " + taskId + " is releasing the permit.");
    semaphore.release(); // 释放许可证
    }
    }
    }
    }

StampedLock

JDK1.8引入

三种锁模式 写锁 悲观度读 乐观读

核心API

  • WriteLock()
  • readLock()
  • tryOptimisticRead() 获取乐观度
  • validate(long) 传入邮戳,查看是否被写占用
  • tryConvertToWriteLock() 尝试锁升级,直接从读锁转换为写锁

与ReadWriteLock区别

提升
  • 支持乐观读
  • 支持锁升级
不足
  • 不支持Condition
  • 不是可重入锁

Future接口

Future 接口是 Java 并发 API 中的一个关键组件,位于 java.util.concurrent 包中。它代表了异步计算的结果,通常与 ExecutorService 一起使用,以便在另一个线程中执行任务并返回结果。

核心API

  • get()获取计算结果 调用时如果线程没有结束,阻塞调用get()的线程
  • get(timeout, unit) 超时则不再阻塞
  • cancel() 取消任务
  • isCanceled() 判断当前任务是否取消
  • isDown() 判断当前任务是否已经完成

FutureTask工具类

  • 构造方法 可传两种参数
    • Runnable接口, T result
    • Callable 接口
  • 核心API
    • 使用Future接口的API
  • 使用
    • new Therad(FutureTask)
    • 调用get() 方法等待值返回

并发容器

List

copyOnWriteArrayList

CWO的思想

写时复制一份,既满足了写的需求,又可以读,不需要加锁,增加了效率

特点

  • 迭代器只支持读,不支持删改
  • 读操作进行的同时,如果还有写操作,会将此数组复制一份,旧的用来读,新的用来写。所以此时读的是一个快照,读不到更新后的数据

Map

ConcurrentHashMap

特点
  • key是无序的
  • 底层实现与HashMap一样
  • 与HashMap不一样的是,不允许key value 为null 为什么?
    • 防止出现语义误解返回值为null,不知道是不存在这个值还是存的null值
JDK1.7结构
  • 维护一个segment数组,每一个segment元素对应着一个HashEntry数组(与HashMap结构一致)
    • segment与hashEntry都是ConcurrentHashMap的内部类
    • segment继承了ReentrantLock,一个segment就是一把锁
  • 维护了负载因子,默认0.75
  • 维护了并发度,默认16
    • 并发度:因为采用分段锁,并发度决定了分段的数量,默认是16,就是16个线程可以在16个不同的分段上进行操作,提高并发性能
  • segment数组的最小长度为2
  • segment数组的最大长度为2^16
  • 扩容时与HashMap没什么区别,就是加了锁再扩容
JDK1.8结构
取消了分段锁 为什么?

当有热点数据时,往往访问的是同一个分段锁下的,这样并发程度高

使用CAS + synchronized来保证线程安全
put操作
  • 如果放在数组上(即还没有拉链)使用CAS操作判断
  • 如果放在链表或者树上,加synchronized
get操作

CAS判断一次,就可以无锁的进行获取

扩容时使用并发机制
  • 使用sizeCtl控制
    • -1代表正在初始化
    • -N代表正在扩容
    • 整数表示要扩容的阈值
  • 最少每一个线程完成16个桶的迁移工作
  • 如果不满16个就由一个线程来完成迁移工作
  • 更快的index计算方法,只需要运算1bit
为什么不用HashTable保证线程安全 ?

效率太低,直接加的synchronized锁

ConcurrentSkipListMap

  • Key有序
  • 底层是跳表

Set

copyOnWriteArraySet

concurrentSkipListMap

Queue

Deque

ConcurrentLinkedQueue

ConcurrentLinkedDeque

BlockingQueue

核心API
  • 抛出异常
    • add()
    • remove()
    • element
  • 返回特殊值
    • offer(long timeout, TimeUnit unit)
      • 插入成功返回true,失败返回false
      • 也可以设置超时时间
    • poll(long timeout, TimeUnit unit)
      • 设置超时时间
      • 用这个时间爱进行非核心线程的销毁
    • peek()
  • 一直阻塞
    • put() 满则阻塞
    • take() 没有元素则阻塞
实现类
  • ArrayBlockingQueue
    • 实现有界的等待队列
    • 设置是否为公平锁
  • LinkedBlockingQueue 无限的阻塞队列
  • DelayQueue设定延迟时间,延迟时间到了才能从DelayQueue中获取
  • PorityBlockingQueue 优先阻塞队列维护优先级
  • SynchronousQueue
    • 没有容器存储不存储任何元素
    • 一个put必须等待一个take否则阻塞
    • 不是AQS的实现,是CAS的实现
    • 用在了newCachedThreadPool适用于短期的任务