✨你好啊,我是“ 罗师傅”,是一名程序猿哦。
🌍主页链接:楚门的世界 - 一个热爱学习和运动的程序猿
☀️博文主更方向为:分享自己的快乐 briup-jp3-ing
❤️一个“不想让我曾没有做好的也成为你的遗憾”的博主。
💪很高兴与你相遇,一起加油!

前言

目标:JVM、JDK新特性、JDK源码、高并发、MySql优化

JUC

JUC(java.util.concurrent包),主要包括线程池工厂类Executors,线程池实现类ThreadPoolExecutor 等。

线程池

线程回顾

  1. 创建线程的方式
    • 继承Thread
    • 实现Runnable
    • 实现Callable
  2. 线程状态
  • NEW:刚刚创建,没做任何操作
  • RUNNABLE:调用run,可以执行,但不代一定在执行(RUNNING、READY)
  • BLOCKED:抢不到锁
  • WAITING
  • TIMED_WAITING
  • TERMINATED
  1. 线程池

线程池就是创建一个换成池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池种成为空闲状态。

使用线程池的优势:

  • 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗
  • 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行
  • 方便线程并发数的管控,因为线程若是无限制地创建,可能会导致内存占用过多而产生OOM
  • 节省CPU切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)
  • 提供更强大的功能,延时定时线程池(Timer vs ScheduledThreadPoolExecutor)
  1. 线程池体系

说明:

  • 最常用的是ThreadPoolExecutor
  • 调度用ScheduledThreadExecutor
  • 任务拆分合并用ForkJoinPool
  • Executors是工具类,协助你创建线程池的

核心参数

Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,我们首先来看它的类体系及构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadPoolExecutor extends AbstractExecutorService {
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//核心的构造函数,其他构造函数都是调用该构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}

线程池核心参数介绍:

  1. corePoolSize:核心线程数量

    • 线程刚创建时,线程数量为0,当每次执行execute添加新的任务时会在线程池创建一个新的线程,直到线程数量达到corePoolSize为止。
    • 核心线程会一直存活,即使没有任务需要执行,当线程数小于核心线程数时,即使有线程空闲,线程也会优先创建新线程处理
    • 设置allowCoreThreadTimeout=true(默认false) 时,核心线程超时会关闭
  2. workQueue:阻塞队列

    • 当线程池正在运行的线程数量以及达到corePoolSize,那么通过execute添加新的任务则会被加入workQueue队列中,在队列中排队等待执行,而不会立即执行。
    • 阻塞队列有以下几种选择:
      • ArrayBlockingQueue
      • LinkedBlockingQueue
      • SynchronousQueue
  3. maxinumPoolSize: 最大线程数

    • 当池中的线程数>=corePoolSize,且任务队列已满时,线程池会创建新线程来处理任务
    • 当池中的线程数=maximumPoolSize, 且任务队列已满时,线程池会拒绝处理任务而抛出异常
  4. KeepAliveTime:线程空闲时间

    • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
    • 如果allowCoreThreadTimeout=true,则会直到线程数量=0
  5. threadFactory:线程工厂,主要用来创建线程

  6. rejectedExecutionHandler:任务拒绝处理器,两种情况会拒绝处理任务

    • 当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务
    • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务的
    • 当拒绝处理任务时线程池会调用refectedExecutionHandler来处理这个任务。如果没有设置默认时AbortPolicy,另外在ThreadPoolExecutor类有几个内部实现类来处理这类情况:
      • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException异常
      • ThreaPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
      • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
      • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

源码剖析

ThradPoolExecutor的最基本使用方式就是通过execute方法提交一个Runnable任务

一定要通过idea一步一步去看源码,不然会很晕的~~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();// 看下图
//判断工作数,如果小于coreSize,addWork,注意第二个参数core=true
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池是RUNNING状态并且task入阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 在检查一下状态
int recheck = ctl.get();
// 如果线程池已经终止,直接移除任务,不再响应
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则,如果没有可用的线程的话(比如coreSize=0),创建一个空worker
// 该worker创建时不会给指派任务(为null),但是会被放入workers集合,进而从workerQueue队列获取任务区执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列也满了,继续调addWorker,但是注意,core=false,开启到maxSize的大门
// 超出max的话,addWorker会返回false,进入reject
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private boolean addWorker(Runnable firstTask, boolean core) {
//第一步,计数判断,不符合条件打回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 判断线程数,说明线程池的线程数是不能设置任意大的
// 最大29位(CAPACITY=29位二进制)
// 超出规定范围,返回false,表示不允许再开启新工作线程,创建worker失败!
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//第二步,创建新worker放入线程集合workers(一个HashSet)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//符合条件,创建新的worker并包装task
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 可重入锁
final ReentrantLock mainLock = this.mainLock;
// 加锁,workers是一个hashset,这里要保障线程安全性
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;

workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//注意,只要是成功add了新的worker,那么将该新worker立即启动,任务得到执行
t.start(); // 会自动调用ThreadPoolExecutor中Worker内部类中的 run()
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
//...
//任务获取与执行
//在worker执行runWorker()的时候,不停循环,先查看自己有没有携带Task,如果有,执行
while (task != null || (task = getTask()) != null) { // 主要是这个getTask()方法
//...
}
//...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 如果没有,会调用getTask,从队列获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling? - 很形象,要不要乖乖的被“捕杀”?
// 判断是不是要超时处理,重点! 决定了当前线程要不要被释放
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程数超出max,并且上次循环中poll等待超时了,那么说明该线程已终止
// 将线程队列数量原子性减一
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 计数器做原子递减,递减成功后,返回null,for被中止
if (compareAndDecrementWorkerCount(c))
return null;
// 递减失败,继续下一轮循环,直到成功
continue;
}

try {
// 如果线程可被释放,那就poll,释放的时间为:keepAliceTime
// 否则,线程是不会被释放的,take一直被阻塞在这里,直到来了新任务继续工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

Executors(不建议用这个)

以上构造函数参数比较多,为了方便使用,JUC提供了一个Executors工具类,内部提供静态方法

  • newCachedThreadPool():弹性线程数
  • newFixedThreadPool(int nThreads):固定线程数
  • newSingleThreadExecutor():单一线程数
  • newScheduledThreadPool(int corePoolSize):可调度,常用于定时

经典面试

  1. 线程池是如果保证线程不被销毁的呢?

如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮worker.runWork()中循环

验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask()

1
2
3
4
5
6
7
8
//worker.runWork():
while (task != null || (task = getTask()) != null)

//work.getTask():
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
  1. 核心线程与非核心线程有区别吗?

没有,被销毁的线程和创建的先后无关,即便是第一个被创建的核心线程,仍然有可能被销毁。

验证:看源码,每个worker在runWork()的时候去getTask(),在getTask内部,并没有针对性地区分当前 worker是否是核心线程的操作,只是判断workers数量超出core,就会调用poll(),否则take()。

1
2
3
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

Fork/Join

概念

ForkJoinPool是由JDK1.7后提供多线程并行执行任务的框架,可以理解为一种特殊的线程池(有点像MapReduce)

  • 任务分割:fork(分叉),先把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
  • 合并结果:join,分割后的子任务被多个线程执行后,再合并结果,得到最终的完整输出

组成

  • ForkJoinTask:主要提供fork和join两个方法用于任务拆分与合并,一般用子类 RecursiveAction(无返回值的任务)RecursiveTask(需要返回值)来实现compute方法。
  • ForkJoinPool:调度ForkJoinTask的线程池
  • ForkJoinWorkerThread:Thread的子类,存放于线程池种的工作线程(Worker)
  • WorkQueue:任务队列,用于保存任务

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SumTask {

private static final Integer MAX = 100;

static class SubTask extends RecursiveTask<Integer> {
// 子任务开始计算的值
private Integer start;

// 子任务结束计算的值
private Integer end;

public SubTask(Integer start , Integer end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if(end - start < MAX) {
//小于边界,开始计算
System.out.println("start = " + start + ";end = " + end);
Integer totalValue = 0;
for(int index = this.start ; index <= this.end ; index++) {
totalValue += index;
}
return totalValue;
}else {
//否则,中间劈开继续拆分
SubTask subTask1 = new SubTask(start, (start + end) / 2);
subTask1.fork(); // 将任务拆分
SubTask subTask2 = new SubTask((start + end) / 2 + 1 , end);
subTask2.fork(); // 将任务拆分
return subTask1.join() + subTask2.join();
}
}
}

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> taskFuture = pool.submit(new SubTask(1,1000));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}
}

设计思想

普通线程池内部有两个重要集合:工作线程集合(普通线程)和任务队列。

ForkJoinPool也类似,线程集合里放的是特殊线程ForkJoinWorkerThread, 任务队列里放的是特殊任务ForkJoinTask

不同之处在于,普通线程池只有一个队列,而ForkJoinPool的工作线程ForkJoinWorkerThread每个线程内都绑定一个双端队列。

在fork的时候,也就是任务拆分,拆分的task会被当前线程放到自己的队列中。如果有任务,那么线程优先从自己的队列里去任务,以FILO先进后出方式从队尾获取任务。当自己队列中任务执行完后,工作线程会跑到其他队列以work-stealing窃取任务,窃取方式为FIFO先进先出,减少竞争。

注意点

使用ForkJoin将相同的计算任务通过多线程执行,但是在使用中需要注意:

  • 任务切分的粒度,也就是fork的界限,并非越小越好
  • 判断要不要使用ForkJoin,任务量不是太大的话,串行可能优于并行,因为多线程会涉及到上下文的切换

原子操作

概念

原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为“不可被终端的一个或一系列操作”

原子性指的是汇编指令层面,比如java中的 i++,其实涉及到了好几条的汇编指令

CAS

CAS(Compare-and-Swap/Exchange),即比较并替换,是一种乐观锁的实现,用于解决并发问题

CAS核心算法:执行函数CAS(V,E,N)

  • V表示准备要被更新的变量(内存的值)
  • E表示我们提供的期望的值(期望的原值)
  • N表示新值,准备更新V的值(新值)

JUC中提供了Atomic开头的类,基于CAS实现原子性操作,最基本的应用就是计数器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AtomicCounter {
// AtomicInteger保证了 i变量的操作是具有原子性的
private static AtomicInteger i = new AtomicInteger(0);

public int get() {
return i.get();
}

public void inc() {
i.incrementAndGet();
}

public static void main(String[] args) throws InterruptedException {
final AtomicCounter counter = new AtomicCounter();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
counter.inc();
}
}).start();
}
Thread.sleep(3000);
//可以正确输出10
System.out.println(i.get());
}
}

CAS虽然很高效地解决了原子操作问题,但是CAS仍然存在三大问题:

  • 自旋(循环)时间长开销很大,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销,注意这里的自旋是在用户态/SDK层面实现的
  • 只能保证一个共享变量的原子操作,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性
  • ABA问题,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比CAS更高效。或者在E对象里加个操作次数变量,每次判断时对比两个,E和操作次数就ok了,因为ABA问题中就算E相同操作次数也绝不相同。

atomic

  • 基本类型
    • AtomicBoolean:以原子更新的方式更新Boolean
    • AtomicInteger:以原子更新的方式更新Integer
    • AtomicLong:以原子更新的方式更新Long
  • 引用类型
    • AtomicReference:原子更新引用类型
    • AtomicReferenceFieldUpdater:原子更新引用类型的字段
    • AtomicMarkableReference:原子更新带有标志位的引用类型
  • 数组
    • AtomicIntegerArray:原子更新整型数组里的元素
    • AtomicLongArray:原子更新长整型数组里的元素
    • AtomicReferenceArray:原子更新引用类型数组里的元素
  • 字段
    • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器
    • AtomicStampedReference:原子更新带有版本号的引用类型

注意

使用atomic要注意原子性的边界,把握不好会起不到应有的效果,原子性被破坏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class BadAtomic {
AtomicInteger i = new AtomicInteger(0);
static int j = 0;

public void badInc() { // + synchronized可以解决同步问题
int k = i.incrementAndGet();
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
j = k; // 再次赋值导致原子性被破坏了
}
public static void main(String[] args) throws InterruptedException {
BadAtomic atomic = new BadAtomic();
for (int i = 0; i < 10; i++) {
new Thread(()->{
atomic.badInc();
}).start();
}
Thread.sleep(3000);
System.out.println(atomic.j);
}

}
// 导致每次结果不一样

AQS

前言

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
ublic class MyLock {
// 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
private volatile int status;

private static final Unsafe unsafe = reflectGetUnsafe();
private static final long valueOffset;
private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();

static {
try {
valueOffset = unsafe.objectFieldOffset
(MyLock.class.getDeclaredField("status"));
} catch (Exception ex) {
throw new Error(ex);
}
}

private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 阻塞式获取锁
* @return
*/
public boolean lock() {
// CAS+自旋
while (!compareAndSet(0,1)) {
}
return true;
}
// cas 设置 status
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 释放锁
*/
public void unlock() {
status = 0;
}

}

存在问题:获取不到锁自旋时,是空转,浪费CPU

解决方法:

  • 使用 yield 让出CPU执行权,等待调度
1
2
3
4
5
6
public boolean lock() {
while (!compareAndSet(0,1)) {
Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
}
return true;
}

或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。

  • 采用等待唤醒机制,但是这里由于没有使用synchronized关键字,所以也无法使用wait/notify,但是我们可以使用park/unpark,获取不到锁的线程park并且去队列排队,释放锁时从队列拿出一个线程unpark
1
2
3
4
5
6
7
8
9
10
11
12
private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();
public boolean lock() {
while (!compareAndSet(0,1)) {
QUEUE.offer(Thread.currentThread());
LockSupport.park();//线程休眠
}
return true;
}
public void unlock() {
status = 0;
LockSupport.unpark(QUEUE.poll()); // 从队列拿出一个队列的头节点
}

AQS概述

  1. AQS底层使用了模板方法模式,给我们提供了许多模板方法,直接使用即可。

基本使用

直接继承AbstractQueuedSynchronizer重写其中的tryXxxx方法即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MyLock implements Lock {

//同步器
private Syn syn = new Syn();

@Override
public void lock() {
//调用模板方法
syn.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public boolean tryLock() {
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {
//调用模板方法
syn.release(0);
}

@Override
public Condition newCondition() {
return null;
}

// 实现一个独占同步器
class Syn extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, arg)) {
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
setState(arg);
return true;
}
}
}

原理解析

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • 线程一来首先调用tryAcquire, 在tryAcquire中直接CAS获取锁,如果获取不成功通过addWaiter加入等待队列,然后走acquireQueued让队列中的某个等待线程去获取锁
  • 不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那就直接获取锁没什么问题,如果有线程等待就直接去获取锁不就相当于插队了吗?
  1. 查看 AbstractQueuedSynchronizer 的类定义,虽然它里面代码很多,但重要的属性就那么几个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
//其他不重要的略
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
public class ConditionObject implements Condition, java.io.Serializable {...}
}
  1. AQS加锁最核心的代码就是如下,我们要来探究它的实现原理
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. 原理搞懂了,那如何让自定义的锁是公平的呢?
  1. 现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平 锁,也支持非公平锁,让使用者可以自由选择,怎么办?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 实现一个独占同步器
class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryRelease(int arg) {
setState(arg);
return true;
}
}

class FairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
return true;
}
return false;
}
}

class NoFairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
//直接去获取锁
if (compareAndSetState(0,arg)) {
return true;
}
return false;
}
}
  1. 现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如 下一段代码,发现一直获取不到锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static Lock lock = new MyLock();
static void test3() {
lock.lock();
try {
System.out.println("test3 get lock,then do something ");
test4();
} finally {
lock.unlock();
}
}
static void test4() {
lock.lock();
try {
System.out.println("test4 get lock,then do something ");
} finally {
lock.unlock();
}
}

那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁(同一线程可重入),也就是说让锁只对不同线程互斥

查看 AbstractQueuedSynchronizer 的定义我们发现,它还继承自另一个类: AbstractOwnableSynchronizer

1
2
3
4
5
6
7
8
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {...}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {...}
protected final Thread getExclusiveOwnerThread(){...}
}

原来 AQS 中有个变量是可以保存当前持有独占锁的线程的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 实现一个独占同步器
class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryRelease(int arg) {
// 判断当前线程是不是 该独占锁的拥有者
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean realRelease = false;
int nextState = getState() - arg;
if (nextState == 0) {
realRelease = true;
setExclusiveOwnerThread(null);
}
setState(nextState);
return realRelease;
}
}
class FairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
final Thread currentThread = Thread.currentThread();
int currentState = getState();
if (currentState == 0 ) { // 可以获取锁
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
//把当前线程设置在这个锁上,表示抢占成功,在重入锁的时候需要
setExclusiveOwnerThread(currentThread);
return true;
}
//比较当前线程和占用锁的线程是不是一个线程
}else if (currentThread == getExclusiveOwnerThread()) {
//重入逻辑 增加 state值
int nextState = currentState + arg;
if (nextState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextState);
return true;
}
return false;
}
}
class NoFairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
final Thread currentThread = Thread.currentThread();
int currentState = getState();
if (currentState ==0 ) { // 可以获取锁
//直接去获取锁
if (compareAndSetState(0,arg)) {
setExclusiveOwnerThread(currentThread);
return true;
}
}else if (currentThread == getExclusiveOwnerThread()) {
//重入逻辑 增加 state值
int nextState = currentState + arg;
if (nextState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextState);
return true;
}
return false;
}
}

并发容器(了解即可)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private transient volatile Object[] array; // volatile
public boolean add(E e) {
final ReentrantLock lock = this.lock; // lock加锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 数组复制
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
  • CopyOnWriteArraySet
    • 对应:HashSet
    • 目标:代替synchronizedSet
    • 原理:与CopyOnWriteArrayList实现原理类似。

并发深入

基本协同

Object

  • wait:让出锁,阻塞等待
  • notify/notifyAll:唤醒wait的进程

扩展:JDK1.5+的lock中支持条件变量, Condition.await(),signal/signalAll 与 wait/notify效果一样,可 以做到更精细化控制。

Thread

  • sleep:休眠一下,让出CPU资源,但是不会释放锁
  • yield:不释放锁,运行转为就绪,让出CPU给大家竞争,当然有可能自己又抢回来
  • join:父线程等待子线程执行完成后再执行,将异步转为同步(注意挂起的是子线程,阻断的是父线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*  面试题:在synchronized代码块内调用join,是否会释放锁?
.wait()是Object下的方法,而.join()是Thread的方法,这就出现了一个问题,
join通过wait实现,释放锁,那么释放的是哪个锁,是Thread还是Object呢?
当join前面的对象与synchronized一致时,释放锁,否则不释放。
*/
public class JoinTest {
public static void main(String[] args) throws Exception{
System.out.println("main.start");
MyThread myThread = new MyThread();
Thread sub = new Thread(myThread);

// synchronized (myThread.lock){
synchronized (sub){
System.out.println("main:before sub");
//1.为什么不会死锁?
sub.start();
System.out.println("main:after sub");
//2.打开join试试:如果线程卡死,说明main不释放锁,如果可以顺利执行,说明sub拿到了锁!
sub.join();
System.out.println("main after join");
}

// sub.join();

System.out.println("main.end");
}

static class MyThread implements Runnable{
byte[] lock = new byte[0];

@Override
public void run() {
synchronized (lock){
System.out.println("I am sub");
}
}
}
}

线程的三大特性

  • CPU添加高速缓存,来平衡与内存的速度差异
  • 操作系统支持多进程、多线程,以分时复用CPU,进而均衡CPU与I/O设备的速度差异
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用

CPU缓存导致可见性问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
程序运行5秒后,并未停止
由于共享变量的可见性问题,主线程对变量的修改,其它线程并未看到
问题解决:共享变量用volatile修饰
*/
public class VisibilityTest {
// 多线程共享变量
private static boolean running = true;
// private static volatile boolean running = true;

public static void main(String[] args) throws Exception {
t1();
}

private static void t1() throws InterruptedException {
new Thread(()->{
while (running) {

}
System.out.println("thread exit");
}).start();

TimeUnit.SECONDS.sleep(5);
running = false;
}
}

线程切换导致原子性问题

这种切换属于一种重量级的切换,现代的操作系统都基于更轻量的线程来调度,而一个进程创建的所有 线程,都是共享一个内存空间的,所以线程切换的成本就很低了,并且线程切换的时机大都是在时间片结束的时候。如下:

虽然操作系统能保证每条指令执行的时是具备原子性的,但是操作系统进行线程切换,可以发生在任意一条CPU指令执行完成之后(注意是CPU指令级别,也就是汇编指令),那这对高级编程语言来说多线程并发时就会造成原子性问题,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
CPU切换,导致原子性问题
问题解决:高级语言操作所对应的多条CPU指令不能被打断,如加锁 或者 使用AtomicXxx包装的数据类型
*/
public class AtomicityTest {
static class AtomicRunnable implements Runnable {
// int i = 0;
AtomicInteger i = new AtomicInteger(0);

@Override
public void run() { // synchronized
// i += 1;
// i.incrementAndGet();
System.out.println("---" + i.incrementAndGet());
}
}

public static void main(String[] args) {
AtomicRunnable runnable = new AtomicRunnable();
for (int i = 0; i < 10000; i++) {
new Thread(runnable).start();
}
}
}

性能优化导致有序性问题

整个过程分为三步:

  • 分配一块内存空间(堆区)
  • 在内存空间上初始化对象
  • 将内存空间的地址赋值给引用变量(栈区)

要注意的是,在分配完内存还未初始化时,对象的实例变量是有一个初始默认值的,比如 int 就是0, 初始化完成之后实例变量才会赋真正的值。

双重锁校验创建单例对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Singleton {
private Singleton(){}
private static Singleton instance;
public static Singleton getInstance(){
// 第一个判断是为了防止已经创建了instance实例后,不在需要拿锁,进而提高了效率
if (instance == null) {
synchronized(Singleton.class) {
// 第二个判断是为了避免重复创建instance实例对象
if (instance == null)
instance = new Singleton();
}
}
return instance;
}
}

补充知识点:线程切换是不会释放锁的。

JMM(Java Memory Model)

主内存与工作内存

JMM解决什么问题

JMM内存交互

Happens-Before

volatile

synchronized

锁概述

锁和资源的关系

Linux内核同步机制

synchronized字节码层面

synchronized锁的原理

锁膨胀

锁消除,锁粗化

锁使用经验

ThreadLocal

概念

使用

应用场景

实现原理

使用注意