✨你好啊,我是“ 罗师傅”,是一名程序猿哦。 🌍主页链接:楚门的世界 - 一个热爱学习和运动的程序猿 ☀️博文主更方向为:分享自己的快乐 briup-jp3-ing ❤️一个“不想让我曾没有做好的也成为你的遗憾”的博主。 💪很高兴与你相遇,一起加油!
前言
目标:JVM、JDK新特性、JDK源码、高并发、MySql优化
JUC JUC(java.util.concurrent包),主要包括线程池工厂类Executors,线程池实现类ThreadPoolExecutor 等。
线程池 线程回顾
创建线程的方式
继承Thread
实现Runnable
实现Callable
线程状态
NEW:刚刚创建,没做任何操作
RUNNABLE:调用run,可以执行,但不代一定在执行(RUNNING、READY)
BLOCKED:抢不到锁
WAITING
TIMED_WAITING
TERMINATED
线程池
线程池就是创建一个换成池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池种成为空闲状态。
使用线程池的优势:
降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗
提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行
方便线程并发数的管控,因为线程若是无限制地创建,可能会导致内存占用过多而产生OOM
节省CPU切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)
提供更强大的功能,延时定时线程池(Timer vs ScheduledThreadPoolExecutor)
线程池体系
说明:
最常用的是ThreadPoolExecutor
调度用ScheduledThreadExecutor
任务拆分合并用ForkJoinPool
Executors是工具类,协助你创建线程池的
核心参数 Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,我们首先来看它的类体系及构造
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ThreadPoolExecutor extends AbstractExecutorService { private static final RejectedExecutionHandler defaultHandler = new AbortPolicy (); public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; } }
线程池核心参数介绍:
corePoolSize:核心线程数量
线程刚创建时,线程数量为0,当每次执行execute添加新的任务时会在线程池创建一个新的线程,直到线程数量达到corePoolSize为止。
核心线程会一直存活,即使没有任务需要执行,当线程数小于核心线程数时,即使有线程空闲,线程也会优先创建新线程处理
设置allowCoreThreadTimeout=true(默认false) 时,核心线程超时会关闭
workQueue:阻塞队列
当线程池正在运行的线程数量以及达到corePoolSize,那么通过execute添加新的任务则会被加入workQueue队列中,在队列中排队等待执行,而不会立即执行。
阻塞队列有以下几种选择:
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
maxinumPoolSize: 最大线程数
当池中的线程数>=corePoolSize,且任务队列已满时,线程池会创建新线程来处理任务
当池中的线程数=maximumPoolSize, 且任务队列已满时,线程池会拒绝处理任务而抛出异常
KeepAliveTime:线程空闲时间
当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
如果allowCoreThreadTimeout=true,则会直到线程数量=0
threadFactory:线程工厂,主要用来创建线程
rejectedExecutionHandler:任务拒绝处理器,两种情况会拒绝处理任务
当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务
当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务的
当拒绝处理任务时线程池会调用refectedExecutionHandler来处理这个任务。如果没有设置默认时AbortPolicy,另外在ThreadPoolExecutor类有几个内部实现类来处理这类情况:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException异常
ThreaPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
源码剖析 ThradPoolExecutor的最基本使用方式就是通过execute方法提交一个Runnable任务
一定要通过idea一步一步去看源码,不然会很晕的~~
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void run () { runWorker(this ); } final void runWorker (Worker w) { while (task != null || (task = getTask()) != null ) { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
Executors(不建议用这个) 以上构造函数参数比较多,为了方便使用,JUC提供了一个Executors工具类,内部提供静态方法
newCachedThreadPool():弹性线程数
newFixedThreadPool(int nThreads):固定线程数
newSingleThreadExecutor():单一线程数
newScheduledThreadPool(int corePoolSize):可调度,常用于定时
经典面试
线程池是如果保证线程不被销毁的呢?
如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮worker.runWork()中循环
验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask()
1 2 3 4 5 6 7 8 while (task != null || (task = getTask()) != null ) boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
核心线程与非核心线程有区别吗?
没有,被销毁的线程和创建的先后无关,即便是第一个被创建的核心线程,仍然有可能被销毁。
验证:看源码,每个worker在runWork()的时候去getTask(),在getTask内部,并没有针对性地区分当前 worker是否是核心线程的操作,只是判断workers数量超出core,就会调用poll(),否则take()。
1 2 3 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Fork/Join 概念 ForkJoinPool是由JDK1.7后提供多线程并行执行任务的框架,可以理解为一种特殊的线程池(有点像MapReduce)
任务分割:fork(分叉),先把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
合并结果:join,分割后的子任务被多个线程执行后,再合并结果,得到最终的完整输出
组成
ForkJoinTask:主要提供fork和join两个方法用于任务拆分与合并,一般用子类 RecursiveAction(无返回值的任务) 和RecursiveTask(需要返回值) 来实现compute方法。
ForkJoinPool:调度ForkJoinTask的线程池
ForkJoinWorkerThread:Thread的子类,存放于线程池种的工作线程(Worker)
WorkQueue:任务队列,用于保存任务
基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class SumTask { private static final Integer MAX = 100 ; static class SubTask extends RecursiveTask <Integer> { private Integer start; private Integer end; public SubTask (Integer start , Integer end) { this .start = start; this .end = end; } @Override protected Integer compute () { if (end - start < MAX) { System.out.println("start = " + start + ";end = " + end); Integer totalValue = 0 ; for (int index = this .start ; index <= this .end ; index++) { totalValue += index; } return totalValue; }else { SubTask subTask1 = new SubTask (start, (start + end) / 2 ); subTask1.fork(); SubTask subTask2 = new SubTask ((start + end) / 2 + 1 , end); subTask2.fork(); return subTask1.join() + subTask2.join(); } } } public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (); Future<Integer> taskFuture = pool.submit(new SubTask (1 ,1000 )); try { Integer result = taskFuture.get(); System.out.println("result = " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(System.out); } } }
设计思想 普通线程池内部有两个重要集合:工作线程集合(普通线程)和任务队列。
ForkJoinPool也类似,线程集合里放的是特殊线程ForkJoinWorkerThread, 任务队列里放的是特殊任务ForkJoinTask
不同之处在于,普通线程池只有一个队列,而ForkJoinPool的工作线程ForkJoinWorkerThread每个线程内都绑定一个双端队列。
在fork的时候,也就是任务拆分,拆分的task会被当前线程放到自己的队列中。如果有任务,那么线程优先从自己的队列里去任务,以FILO 先进后出方式从队尾获取任务。当自己队列中任务执行完后,工作线程会跑到其他队列以work-stealing窃取任务,窃取方式为FIFO 先进先出,减少竞争。
注意点 使用ForkJoin将相同的计算任务通过多线程执行,但是在使用中需要注意:
任务切分的粒度,也就是fork的界限,并非越小越好
判断要不要使用ForkJoin,任务量不是太大的话 ,串行可能优于并行,因为多线程会涉及到上下文的切换
原子操作 概念 原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为“不可被终端的一个或一系列操作”
原子性指的是汇编指令层面,比如java中的 i++,其实涉及到了好几条的汇编指令
CAS CAS(Compare-and-Swap/Exchange),即比较并替换,是一种乐观锁的实现,用于解决并发问题 。
CAS核心算法:执行函数CAS(V,E,N)
V表示准备要被更新的变量(内存的值)
E表示我们提供的期望的值(期望的原值)
N表示新值,准备更新V的值(新值)
JUC中提供了Atomic开头的类,基于CAS实现原子性操作,最基本的应用就是计数器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class AtomicCounter { private static AtomicInteger i = new AtomicInteger (0 ); public int get () { return i.get(); } public void inc () { i.incrementAndGet(); } public static void main (String[] args) throws InterruptedException { final AtomicCounter counter = new AtomicCounter (); for (int i = 0 ; i < 10 ; i++) { new Thread (new Runnable () { @Override public void run () { counter.inc(); } }).start(); } Thread.sleep(3000 ); System.out.println(i.get()); } }
CAS虽然很高效地解决了原子操作问题,但是CAS仍然存在三大问题:
自旋(循环)时间长开销很大,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销,注意这里的自旋是在用户态/SDK层面实现的
只能保证一个共享变量 的原子操作,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性
ABA问题,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比CAS更高效。或者在E对象里加个操作次数变量 ,每次判断时对比两个,E和操作次数就ok了,因为ABA问题中就算E相同操作次数也绝不相同。
atomic
基本类型
AtomicBoolean:以原子更新的方式更新Boolean
AtomicInteger:以原子更新的方式更新Integer
AtomicLong:以原子更新的方式更新Long
引用类型
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型的字段
AtomicMarkableReference:原子更新带有标志位的引用类型
数组
AtomicIntegerArray:原子更新整型数组里的元素
AtomicLongArray:原子更新长整型数组里的元素
AtomicReferenceArray:原子更新引用类型数组里的元素
字段
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
AtomicLongFieldUpdater:原子更新长整型字段的更新器
AtomicStampedReference:原子更新带有版本号的引用类型
注意 使用atomic要注意原子性的边界,把握不好会起不到应有的效果,原子性被破坏。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class BadAtomic { AtomicInteger i = new AtomicInteger (0 ); static int j = 0 ; public void badInc () { int k = i.incrementAndGet(); try { Thread.sleep(new Random ().nextInt(100 )); } catch (InterruptedException e) { e.printStackTrace(); } j = k; } public static void main (String[] args) throws InterruptedException { BadAtomic atomic = new BadAtomic (); for (int i = 0 ; i < 10 ; i++) { new Thread (()->{ atomic.badInc(); }).start(); } Thread.sleep(3000 ); System.out.println(atomic.j); } }
AQS 前言
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 ublic class MyLock { private volatile int status; private static final Unsafe unsafe = reflectGetUnsafe(); private static final long valueOffset; private static final Queue<Thread> QUEUE = new LinkedBlockingQueue <>(); static { try { valueOffset = unsafe.objectFieldOffset (MyLock.class.getDeclaredField("status" )); } catch (Exception ex) { throw new Error (ex); } } private static Unsafe reflectGetUnsafe () { try { Field field = Unsafe.class.getDeclaredField("theUnsafe" ); field.setAccessible(true ); return (Unsafe) field.get(null ); } catch (Exception e) { e.printStackTrace(); return null ; } } public boolean lock () { while (!compareAndSet(0 ,1 )) { } return true ; } public final boolean compareAndSet (int expect, int update) { return unsafe.compareAndSwapInt(this , valueOffset, expect, update); } public void unlock () { status = 0 ; } }
存在问题:获取不到锁自旋时,是空转,浪费CPU
解决方法:
1 2 3 4 5 6 public boolean lock () { while (!compareAndSet(0 ,1 )) { Thread.yield (); } return true ; }
或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。
采用等待唤醒机制,但是这里由于没有使用synchronized
关键字,所以也无法使用wait/notify
,但是我们可以使用park/unpark
,获取不到锁的线程park
并且去队列排队,释放锁时从队列拿出一个线程unpark
1 2 3 4 5 6 7 8 9 10 11 12 private static final Queue<Thread> QUEUE = new LinkedBlockingQueue <>();public boolean lock () { while (!compareAndSet(0 ,1 )) { QUEUE.offer(Thread.currentThread()); LockSupport.park(); } return true ; } public void unlock () { status = 0 ; LockSupport.unpark(QUEUE.poll()); }
AQS概述
AQS底层使用了模板方法模式,给我们提供了许多模板方法,直接使用即可。
基本使用 直接继承AbstractQueuedSynchronizer重写其中的tryXxxx方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class MyLock implements Lock { private Syn syn = new Syn (); @Override public void lock () { syn.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { } @Override public boolean tryLock () { return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return false ; } @Override public void unlock () { syn.release(0 ); } @Override public Condition newCondition () { return null ; } class Syn extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , arg)) { return true ; } return false ; } @Override protected boolean tryRelease (int arg) { setState(arg); return true ; } } }
原理解析
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
线程一来首先调用tryAcquire
, 在tryAcquire
中直接CAS获取锁,如果获取不成功通过addWaite
r加入等待队列,然后走acquireQueued
让队列中的某个等待线程去获取锁
不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那就直接获取锁没什么问题,如果有线程等待就直接去获取锁不就相当于插队了吗?
查看 AbstractQueuedSynchronizer 的类定义,虽然它里面代码很多,但重要的属性就那么几个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { private volatile int state; private transient volatile Node head; private transient volatile Node tail; static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; } public class ConditionObject implements Condition , java.io.Serializable {...} }
AQS加锁最核心的代码就是如下,我们要来探究它的实现原理
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
原理搞懂了,那如何让自定义的锁是公平的呢?
现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平 锁,也支持非公平锁,让使用者可以自由选择,怎么办?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryRelease (int arg) { setState(arg); return true ; } } class FairSync extends Sync { @Override protected boolean tryAcquire (int arg) { if (!hasQueuedPredecessors() && compareAndSetState(0 ,arg)) { return true ; } return false ; } } class NoFairSync extends Sync { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 ,arg)) { return true ; } return false ; } }
现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如 下一段代码,发现一直获取不到锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static Lock lock = new MyLock ();static void test3 () { lock.lock(); try { System.out.println("test3 get lock,then do something " ); test4(); } finally { lock.unlock(); } } static void test4 () { lock.lock(); try { System.out.println("test4 get lock,then do something " ); } finally { lock.unlock(); } }
那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁(同一线程可重入 ),也就是说让锁只对不同线程互斥 。
查看 AbstractQueuedSynchronizer
的定义我们发现,它还继承自另一个类: AbstractOwnableSynchronizer
1 2 3 4 5 6 7 8 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable {...} public abstract class AbstractOwnableSynchronizer implements java .io.Serializable { private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) {...} protected final Thread getExclusiveOwnerThread () {...} }
原来 AQS 中有个变量是可以保存当前持有独占锁的线程的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryRelease (int arg) { if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException (); } boolean realRelease = false ; int nextState = getState() - arg; if (nextState == 0 ) { realRelease = true ; setExclusiveOwnerThread(null ); } setState(nextState); return realRelease; } } class FairSync extends Sync { @Override protected boolean tryAcquire (int arg) { final Thread currentThread = Thread.currentThread(); int currentState = getState(); if (currentState == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 ,arg)) { setExclusiveOwnerThread(currentThread); return true ; } }else if (currentThread == getExclusiveOwnerThread()) { int nextState = currentState + arg; if (nextState < 0 ) { throw new Error ("Maximum lock count exceeded" ); } setState(nextState); return true ; } return false ; } } class NoFairSync extends Sync { @Override protected boolean tryAcquire (int arg) { final Thread currentThread = Thread.currentThread(); int currentState = getState(); if (currentState ==0 ) { if (compareAndSetState(0 ,arg)) { setExclusiveOwnerThread(currentThread); return true ; } }else if (currentThread == getExclusiveOwnerThread()) { int nextState = currentState + arg; if (nextState < 0 ) { throw new Error ("Maximum lock count exceeded" ); } setState(nextState); return true ; } return false ; } }
并发容器(了解即可)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private transient volatile Object[] array; public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
CopyOnWriteArraySet
对应:HashSet
目标:代替synchronizedSet
原理:与CopyOnWriteArrayList实现原理类似。
并发深入 基本协同 Object
wait:让出锁,阻塞等待
notify/notifyAll:唤醒wait的进程
扩展:JDK1.5+的lock中支持条件变量, Condition.await(),signal/signalAll 与 wait/notify效果一样,可 以做到更精细化控制。
Thread
sleep:休眠一下,让出CPU资源,但是不会释放锁
yield:不释放锁,运行转为就绪,让出CPU给大家竞争,当然有可能自己又抢回来
join:父线程等待子线程执行完成后再执行,将异步转为同步(注意挂起的是子线程,阻断的是父线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class JoinTest { public static void main (String[] args) throws Exception{ System.out.println("main.start" ); MyThread myThread = new MyThread (); Thread sub = new Thread (myThread); synchronized (sub){ System.out.println("main:before sub" ); sub.start(); System.out.println("main:after sub" ); sub.join(); System.out.println("main after join" ); } System.out.println("main.end" ); } static class MyThread implements Runnable { byte [] lock = new byte [0 ]; @Override public void run () { synchronized (lock){ System.out.println("I am sub" ); } } } }
线程的三大特性
CPU添加高速缓存,来平衡与内存的速度差异
操作系统支持多进程、多线程,以分时复用CPU,进而均衡CPU与I/O设备的速度差异
编译程序优化指令执行次序,使得缓存能够得到更加合理地利用
CPU缓存导致可见性问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class VisibilityTest { private static boolean running = true ; public static void main (String[] args) throws Exception { t1(); } private static void t1 () throws InterruptedException { new Thread (()->{ while (running) { } System.out.println("thread exit" ); }).start(); TimeUnit.SECONDS.sleep(5 ); running = false ; } }
线程切换导致原子性问题
这种切换属于一种重量级的切换,现代的操作系统都基于更轻量的线程来调度,而一个进程创建的所有 线程,都是共享一个内存空间的,所以线程切换的成本就很低了,并且线程切换的时机大都是在时间片结束的时候。如下:
虽然操作系统能保证每条指令执行的时是具备原子性的,但是操作系统进行线程切换,可以发生在任意一条CPU指令执行完成之后(注意是CPU指令级别,也就是汇编指令 ),那这对高级编程语言来说多线程并发时就会造成原子性问题,如下图所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class AtomicityTest { static class AtomicRunnable implements Runnable { AtomicInteger i = new AtomicInteger (0 ); @Override public void run () { System.out.println("---" + i.incrementAndGet()); } } public static void main (String[] args) { AtomicRunnable runnable = new AtomicRunnable (); for (int i = 0 ; i < 10000 ; i++) { new Thread (runnable).start(); } } }
性能优化导致有序性问题
整个过程分为三步:
分配一块内存空间(堆区)
在内存空间上初始化对象
将内存空间的地址赋值给引用变量(栈区)
要注意的是,在分配完内存还未初始化时,对象的实例变量是有一个初始默认值的,比如 int 就是0, 初始化完成之后实例变量才会赋真正的值。
双重锁校验创建单例对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Singleton { private Singleton () {} private static Singleton instance; public static Singleton getInstance () { if (instance == null ) { synchronized (Singleton.class) { if (instance == null ) instance = new Singleton (); } } return instance; } }
补充知识点:线程切换是不会释放锁的。
JMM(Java Memory Model) 主内存与工作内存 JMM解决什么问题 JMM内存交互 Happens-Before volatile synchronized 锁概述 锁和资源的关系 Linux内核同步机制 synchronized字节码层面 synchronized锁的原理 锁膨胀 锁消除,锁粗化 锁使用经验 ThreadLocal 概念 使用 应用场景 实现原理 使用注意