Java线程池原理分析
2023-01-29
本文主要介绍ThreadPoolExecutor
构造函数的参数
corePoolSize
核心线程个数
maximumPoolSize
最大线程个数
- 当线程个数达到corePoolSize且任务队列满了,会继续创建线程直到线程池内总线程数达到maximumPoolSize
- 对于无限队列,线程数不会多于corePoolSize
keepAliveTime,unit
非核心线程个数超时时间及单位
workQueue
任务队列
threadFactory
线程工厂。主要用于自定义线程name
handler
拒绝策略。当线程数已达最大且任务队列满了触发策略。内置4种拒绝策略,也可以自定义
AbortPolicy:直接抛异常DiscardPolicy:什么也不做,丢弃任务DiscardOldestPolicy:移除最早的任务,然后尝试再次执行execute。会校验线程池状态CallerRunsPolicy:直接在本线程内执行。会校验线程池状态- 自定义:实现接口
RejectedExecutionHandler
内部状态ctl字段
使用32位int保存线程池状态(3位)和线程数(29位)
线程池状态有5个:
-1:RUNNING,创建后的状态0:SHUTDOWN:调用shutdown,不再接受新的任务1:STOP:调用shutdownNow,丢弃所有任务,中断正在执行的任务2:TIDYING:工作线程为0,队列为空。即将执行terminated()3:TERMINATED:已终止。在执行terminated() 之后
execute方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
getTask方法
从队列里获取下一个任务
- 是一个阻塞方法,利用阻塞队列实现阻塞
- 非核心线程的阻塞时常等于keepAliveTime+unit,超时后返回
null - 核心线程一直阻塞(有开关
allowCoreThreadTimeOut)
runWorker方法
- 属于线程池方法
- while循环调用
getTask方法获取任务并执行,如果任务为null,worker退出 - 任务抛出异常则worker退出
- 退出时会调用
tryTerminate - worker退出时,任务发生异常或线程数小于核心线程数,则会尝试新建一个非核心线程
submit方法
execute的参数是Runnable,Runnable并没有返回值,无法直接获取执行的结果,而submit的参数Callable是有返回值的,submit返回Future,包含Callable的返回值
1 2 3 4 5 6
| public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
|
比较简单,将Callable<T>包装成Runnable传给execute,而newTaskFor就是新建一个FutureTask
FutureTask
- 在
run函数内调用call函数,使用outcome字段保存结果或者异常 - 调用
get时,校验状态,抛出异常或者返回结果
阻塞队列和AQS
以下以LinkedBlockingQueue为例
offer方法,阻塞添加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();
return true; }
|
poll方法,阻塞获取
流程和上述差不多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
await实现原理
以LinkedBlockingQueue为例,其内部使用ReentrantLock
AQS(AbstractQueuedSynchronizer,抽象队列同步器)
一个AQS包含一个同步队列和多个条件队列。在条件队列等待的线程需等待对应条件的signal信号,然后加入到同步队列
如果当前队列如下
1 2 3
| Sync Queue: 1 --> 2 --> 3 Condition A Queue: 4 --> 5 --> 6 Condition B Queue: 7 --> 8 --> 9
|
触发一次A.signal(),结果如下
1 2 3
| Sync Queue: 1 --> 2 --> 3 --> 4 Condition A Queue: 5 --> 6 Condition B Queue: 7 --> 8 --> 9
|
源码解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public abstract class AbstractQueuedSynchronizer { static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; } Node head; Node tail; public class ConditionObject { private transient Node firstWaiter; private transient Node lastWaiter; public final void signal() { enq(firstWaiter) } public final void await() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } acquireQueued(node, savedState) } } private Node enq(final Node node) { } }
|
Unsafe.park(absolute, time)
挂起当前线程
- absolute:是否是绝对时间。若为true,则time是具体毫秒数时间戳;若为false,则time是挂起的纳秒数
Unsafe.unpark(thread)
恢复挂起线程
参考
- https://blog.csdn.net/a7980718/article/details/83661613
CompletionService
2023-03-14
背景
当并行从有多个数据源同时取数据时,我们可能会写出以下代码,先提交任务再循环get,使用异常进行逻辑判断,很不优雅
传统写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| final long TIMEOUT = 500; final int TASK_NUM = 5;
List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < TASK_NUM; i++) { Future<String> future = executor.submit(callable); futures.add(future); }
long start = System.currentTimeMillis(); for (Future<String> future : futures) { long time = TIMEOUT - (System.currentTimeMillis() - start); if (time > 0 || future.isDone()) { try { String s = future.get(time, TimeUnit.MILLISECONDS); res.add(s); } catch (Exception e) { e.printStackTrace(); } } else { future.cancel(true); } }
|
如果使用CompletionService
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletionService<String> completionService = new ExecutorCompletionService<>(executor); for (int i = 0; i < TASK_NUM; i++) { completionService.submit(callable); }
long start2 = System.currentTimeMillis(); for (int i = 0; i < 3; i++) { long time = TIMEOUT - (System.currentTimeMillis() - start2); if (time <= 0) break; Future<String> future = completionService.poll(time, TimeUnit.MILLISECONDS); if (future == null) break; res.add(f.get()); }
|
CompletionService原理
1 2 3 4 5 6 7 8
| private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
|
其他使用场景
取出第一个完成的结果
1 2 3 4 5
| for (int i = 0; i < TASK_NUM; i++) { completionService.submit(callable); } String result = completionService.take().get();
|
限时内取出第一个非null的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| List<Future<String>> futures4 = new ArrayList<>(TASK_NUM); for (int i = 0; i < TASK_NUM; i++) { futures4.add(completionService.submit(callable)); } long start4 = System.currentTimeMillis();
String result4 = null; for (int i = 0; i < TASK_NUM; i++) { long time = TIMEOUT - (System.currentTimeMillis() - start4); if (time <= 0) break; Future<String> future = completionService.poll(time, TimeUnit.MILLISECONDS); if (future == null) break; String data = future.get(); if (data != null) { result4 = data; break; } } for (Future<String> future : futures4) { future.cancel(true); }
if (result4 != null) { }
|
ConcurrentHashMap原理
2023-04-19
基于Java 8
初始化Table
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
|
Get方法
流程:
- 如果直接在table上,直接返回
- 如果eh < 0,执行该节点的
find方法eh=-1:MOVED:在扩容中eh=-2:TREEBIN:作为红黑树的根节点eh=-3:RESERVED:占位Node
- 除此之外就是链表,直接遍历链表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
|
find方法
Node:链表遍历查询ForwardingNode:迁移过程中,查询新节点TreeBin:红黑树根节点查询,包括TreeNodeReservationNode:用于占位的Node,始终返回null,用于compute和computeIfAbsent
Put方法
流程:
- 如果table没有初始化,则先初始化
- 如果新节点对应的桶为空,则通过cas设置,成功则返回
- 如果对应的桶的hash是
MOVED,说明在扩容中,则先帮助扩容 - 将桶作为同步对象
fh >= 0说明是链表,直接在链表尾部添加- 如果是红黑树,则插入
- 如果链表的操作数大于等于8,则转为红黑树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
|
transfer扩容
桶的个数n是2次幂的,桶定位方式是i = (n - 1) & h),当扩容为2倍时,如果n & h == 0,则桶号保持不变,如果n & h == n,则桶号为i + n
- 初始化
nextTab,大小为原来的两倍 - 根据CPU个数选出每次循环处理的批次,最少16,从右往左处理依次处理
- 如果
tab[i] == null,则通过cas设置旧table的tab[i]为fwd - 如果是链表或者红黑树,则拆成2条链或树。如果红黑树个数很少,会退化为链表。通过cas设置旧table的
tab[i]为fwd,新table的两个桶为新的Node或TreeBin节点
参考
CompletableFuture
2023-04-06
Java 8引入了函数式编程,大大提高代码的可读性,写起来也更优雅。异步编程自然也不例外,新增的CompletableFuture就是为了使用函数式编程来更优雅地编写异步代码
构造CompletableFuture
一般直接使用工厂方法创建
completedFuture(value):直接构造一个已经完成的CompletableFuturesupplyAsync(Supplier<U>,Executor):如果第二个参数不传,则使用默认的ForkJoinPool,不建议如此做runAsync(Runnable,Executor):和supplyAsync类似,无返回值版本
链式调用(异步顺序执行)
大多数链式调用都有异步版本(Async结尾),会多一个Executor参数,如果不传会使用ForkJoinPool
thenApply(Function<T,U>):在上一段代码完成后执行,接受上一段代码的结果T,并生成新的结果U,返回CompletableFuture<U>- 有异步版本
AND合并版本:thenCombine(other,BiFunction<T,U,V>)OR合并版本:applyToEither(other,Function<T,V>)
thenAccept(Consumer<T>):在上一段代码完成后执行,接受上一段代码的结果T,返回CompletableFuture<Void>- 有异步版本
AND合并版本:thenAcceptBoth(other,BiConsumer<T,U>)OR合并版本:acceptEither(other,Consumer<T>)
thenRun(Runnable):在上一段代码完成后执行,返回CompletableFuture<Void>- 有异步版本
AND合并版本:runAfterBoth(other,Runnable)OR合并版本:runAfterEither(other,Runnable)
thenCompose(Function<T,CompletionStage<U>>):和thenApply不同,直接返回CompletableFuture<U>handle(BiFunction):同时处理结果和异常whenComplete(BiConsumer):handle的无返回值版本exceptionally(Function):链式(不仅是前一个)发生异常时执行,一次异常仅触发一次
工具方法
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs):等待cfs全部结束CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs):等待cfs其中一个结束
注意事项
同步操作的线程问题
- 如果前一个异步代码段有阻塞操作,则后续同步操作的线程和前面的异步线程相同,否则会在当前线程
没有阻塞操作时:
1 2 3 4 5 6
| Executor executor = Executors.newCachedThreadPool(); CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return "data"; }, executor) .thenRun(() -> System.out.println(Thread.currentThread().getName()));
|
输出:(同步操作在当前线程执行)
添加阻塞操作:
1 2 3 4 5 6 7 8 9 10
| Executor executor = Executors.newCachedThreadPool(); CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(1); } catch (Exception e) { } return "data"; }, executor) .thenRun(() -> System.out.println(Thread.currentThread().getName()));
|
输出:(同步操作和异步线程线程一样)
1 2
| pool-1-thread-1 pool-1-thread-1
|
- 提示
System.out.format第一次执行时存在阻塞操作,而System.out.println没有
参考
synchronized
2023-04-20
synchronized特性
- 乐观锁,如果频繁,则转为悲观锁
- 普通互斥锁,非公平锁、可重入锁
- 轻量级锁(自旋锁),如果锁被持有的时间较长,则转换为重量级锁(挂起等待)
synchronized优化
- 1.6之前是重量级锁
- 锁升级(膨胀)机制:无锁升级到偏向锁,再到轻量级锁,最后到重量级锁的过程
- 锁消除,根据逃逸分析,对锁进行消除
- 锁粗化,减少加锁解锁次数
- 自适应自旋锁,线程自旋的次数不再是固定的值,而是一个动态改变的值,这个值会根据前一次自旋获取锁的状态来决定此次自旋的次数
锁介绍
偏向锁
在运行过程中,对象的锁偏向某个线程。即在开启偏向锁机制的情况下,某个线程获得锁,当该线程下次再想要获得锁时,不需要再获得锁(即忽略synchronized关键词),直接就可以执行同步代码,比较适合竞争较少的情况
- 线程不会主动去释放偏向锁
- 偏向锁的撤销需要等待全局安全点(即没有字节码正在执行),它会暂停拥有偏向锁的线程,撤销后偏向锁恢复到未锁定状态或轻量级锁状态
- 如果有竞争,则升级为轻量级锁
- since 1.6
- JDK 15开始默认关闭
轻量级锁
对于没有多线程竞争的情况,如果存在多线程竞争,则膨胀为重量级锁
重量级锁
即当有其他线程占用锁时,当前线程会进入阻塞状态
自旋锁
在自旋状态下,当一个线程A尝试进入同步代码块,但是当前的锁已经被线程B占有时,线程A不进入阻塞状态,而是不停的空转,等待线程B释放锁。如果锁的线程能在很短时间内释放资源,那么等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞状态,只需自旋,等持有锁的线程释放后即可立即获取锁,避免了用户线程和内核的切换消耗。
自旋等待最大时间:线程自旋会消耗cpu,若自旋太久,则会让cpu做太多无用功,因此要设置自旋等待最大时间。
优点:开启自旋锁后能减少线程的阻塞,在对于锁的竞争不激烈且占用锁时间很短的代码块来说,能提升很大的性能,在这种情况下自旋的消耗小于线程阻塞挂起的消耗
缺点:在线程竞争锁激烈,或持有锁的线程需要长时间执行同步代码块的情况下,使用自旋会使得cpu做的无用功太多
参考