最新下载
热门教程
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
ThreadForge 源码解读二:一个 Task 从 submit 到完成 内部运行机制深度剖析
时间:2026-06-14 08:53:06 编辑:袖梨 来源:一聚教程网
上一篇( juejin.cn/post/763703…) 讲的是边界:ThreadScope 负责把一批并发任务收进同一个生命周期。

这一篇往里再进一步,直接来看执行链路,我们带着问题来看:
1. 为什么要有 Task<T>
CompletableFuture 本身已经非常强大了,但业务代码处理并发阶段时,往往还需要这些东西:
- 明确的任务名称
- 一眼就能看明白的状态机
- 实际执行线程的记录
- 稳定的取消入口
- 生命周期绑定
- 更贴近业务的等待和异常语义
所以 ThreadForge 额外包装了一层 Task<T> 做对外暴露。
Task 的核心字段在 src/main/java/io/threadforge/Task.java:
private final long id;
private final String name;
private final CompletableFuture<T> future;
private final AtomicReference<State> state;
private final AtomicReference<Thread> runnerThread;
状态定义如下:
public enum State {
PENDING,
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
可以先这样来理解 :
2. Task 状态流转:从 PENDING 到终态
5 个状态,5 个状态切换方法,路径很少,清晰明了:
状态切换的入口都在 Task 上,包级可见,外部只能读:
// 包级 — 由 ThreadScope 在正确时机调用
boolean markRunning(Thread runner)
void markSuccess()
void markFailed()
void markCancelled()
void interruptRunner()
先看 markRunning(...):
boolean marked = state.compareAndSet(State.PENDING, State.RUNNING);
if (marked) {
runnerThread.set(runner);
}
主要做了两件事:
- CAS 从
PENDING切到RUNNING——只有一个线程能成功 - 成功后把执行线程记下来——后面取消时知道该中断谁
如果 CAS 失败(比如任务在排队的间隙被 cancel 了),方法返回 false,调用方跳过执行。这就是 cancel 和 markRunning 之间的竞态保护:cancel 先改状态为 CANCELLED,markRunning 的 CAS 就会失败,任务不会开始跑。
其余三个终态方法结构对称——清掉 runner 引用,设置状态:
void markSuccess() { runnerThread.set(null); state.set(State.SUCCESS); }
void markFailed() { runnerThread.set(null); state.set(State.FAILED); }
void markCancelled() { runnerThread.set(null); state.set(State.CANCELLED); }
终态后 runnerThread 清理成 null,避免外界持有一个已终止线程的句柄。
3. cancel:三步走,协作式终止
Task.cancel() 实现很简洁:
public boolean cancel() {
state.set(State.CANCELLED);
Thread runner = runnerThread.get();
if (runner != null) {
runner.interrupt();
}
return future.cancel(true);
}
其实就三步,保持顺序:
- 先设状态为
CANCELLED——这一步配合上一节讲的markRunningCAS,抢占式阻止任务启动 - 再中断 runner——如果任务已经在跑,发中断信号;如果还没拿到 runner,跳过
- 最后取消底层 future——
future.cancel(true)会尝试中断 future 内部的执行线程
注意为什么是「协作式」终止:Thread.interrupt() 只是发信号,不是强行杀线程。任务代码、阻塞点、中断处理逻辑要一起配合才能停下来。ThreadForge 靠三层协作:
Thread.interrupt()—— 唤醒阻塞中的线程CancellationToken.throwIfCancelled()—— 任务代码主动检查取消点- 任务业务逻辑对中断的响应 —— 捕获
InterruptedException后做清理并退出
单独哪一层都不够,三层叠加才算是可靠。
4. submit 的完整时序:从参数校验到进入调度器
ThreadScope 有十几个 submit(...) 重载,全部收敛到一个私有方法。去掉参数校验和并发许可申请后,核心构造逻辑如下:
final CompletableFuture<T> future = new CompletableFuture<T>();
final Task<T> task = new Task<T>(id, name, future);
final TaskInfo info = new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name());
final ExecutionContextCarrier executionContext = ExecutionContextCarrier.capture();
final ScheduledTask timeoutTask = scheduleTaskTimeout(task, info, taskTimeout);
tasks.add(task);
future.whenComplete((value, throwable) -> {
tasks.remove(task);
if (timeoutTask != null) {
timeoutTask.cancel();
}
});
然后交给调度器:
scheduler.executor().execute(Scheduler.prioritized(
executionContext.wrapRunnable(() -> runTask(task, info, callable, taskRetryPolicy,
permitAcquired ? semaphore : null)),
taskPriority,
id
));
如果调度器拒绝执行(RejectedExecutionException),框架会释放并发许可、标记任务失败并触发 hook。
逐行看:
- 建
CompletableFuture— 任务结果的载体 - 建
Task— 带状态机和元信息的句柄 - 建
TaskInfo— 观测用的快照数据 - 捕获上下文 —
Context和 OpenTelemetry 双份 - 注册任务级 timeout — 可选的独立时间预算
- 登记到
tasks— scope 后续可以捞出所有任务 - 注册
whenComplete回调 — 任务无论成功、失败还是取消,结束时自动从tasks队列移除,同时取消关联的 timeout 任务 - 包成带优先级的 runnable,交给调度器
上半段提交,下半段执行,最后按成功、失败、取消三条路径收尾。
5. runTask(...):真正执行任务的内核
runTask(...) 是整条链路的执行引擎。完整方法约 40 行,精简后主结构如下:
long started = System.nanoTime();
CompletableFuture<T> future = task.toCompletableFuture();try {
// ① 启动前双重取消检查
if (task.isCancelled() || token.isCancelled()) {
completeTaskCancelled(task, future, new CancelledException("..."), info, started);
return;
} // ② 状态机: PENDING → RUNNING
if (!task.markRunning(Thread.currentThread())) {
return;
} // ③ 触发观测
safeHookStart(info);
token.throwIfCancelled(); // ④ 真正执行(含重试)
T value = RetryExecutor.execute(callable, retryPolicy, token);
if (future.complete(value)) {
task.markSuccess();
safeHookSuccess(info, elapsedNanos(started));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
completeTaskCancelled(task, future, new CancelledException("..."), info, started);
} catch (CancelledException e) {
completeTaskCancelled(task, future, e, info, started);
} catch (Throwable t) {
completeTaskFailure(task, future, t, info, started);
} finally {
if (acquiredSemaphore != null) {
acquiredSemaphore.release(); // 释放并发许可
}
}
异常走了三条分叉:
InterruptedException→ 重置线程中断标志后走取消路径CancelledException→ 直接走取消路径- 其他
Throwable→ 走失败路径(completeTaskFailure内部通过future.completeExceptionally(...)把异常写入 future,再调用task.markFailed()和 hook)
失败、取消、重试、并发许可——全收在这一个方法里。
6. Scheduler:执行策略和所有权分离
private final ExecutorService executor;
private final boolean ownsExecutor;
private final String name;
private final boolean virtualThreadMode;
四个字段,四个职责:
executor— 任务交给谁执行ownsExecutor— scope 关闭时要不要连带关掉执行器name— 日志和观测里的标识virtualThreadMode— 是否跑在虚拟线程上
6.1 Scheduler.detect()
public static Scheduler detect() {
if (isVirtualThreadSupported()) {
return virtualThreads();
}
return commonPool();
}
isVirtualThreadSupported() 通过反射探测 Executors.newVirtualThreadPerTaskExecutor(),virtualThreads() 内部 DCL 延迟创建。反射找不到方法或调用失败,自动回退到 commonPool()。调用方一行不用改。
6.2 Scheduler.fixed(int size)
int queueCapacity = Math.max(256, size * 100);
new ThreadPoolExecutor(
size, size, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new NamedThreadFactory("threadforge-fixed"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.allowCoreThreadTimeOut(true);
- core = max,固定线程数;线程打满后任务入有界队列
queueCapacity至少 256,随线程数线性放大CallerRunsPolicy:队列满时提交线程自己执行,形成自然反压allowCoreThreadTimeOut(true):空闲超过 60 秒的核心线程也回收,避免高峰过后空转
6.3 Scheduler.priority(int size)
同样固定线程池,但队列换成无界的 PriorityBlockingQueue:
new ThreadPoolExecutor(
size, size, 60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>(),
new NamedThreadFactory("threadforge-priority"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
队列无界,CallerRunsPolicy 一般不会被触发。优先级靠 PrioritizedRunnable.compareTo(...) 实现:
int byPriority = Integer.compare(this.taskPriority.rank(), other.taskPriority.rank());
if (byPriority != 0) return byPriority;
return Long.compare(this.sequence, other.sequence);
先按 TaskPriority 排,同优先级按提交顺序(FIFO)。
无论哪种方式创建,最后都从 executor.execute(...) 出去。
7. ExecutionContextCarrier:线程切换时,上下文怎么跟着走
线程切换后,提交线程里的 Context 和 OpenTelemetry 上下文就断了——两者都基于线程局部变量。ExecutionContextCarrier 专门解决这个问题:
// 提交时捕获
static ExecutionContextCarrier capture() {
return new ExecutionContextCarrier(
Context.capture(),
OpenTelemetryBridge.currentContext()
);
}// 执行时安装,结束后恢复
Context.Snapshot previous = Context.install(contextSnapshot);
Object scope = OpenTelemetryBridge.makeCurrent(otelParentContext);
try {
return callable.call();
} finally {
OpenTelemetryBridge.closeScope(scope);
Context.restore(previous);
}
传播了两类上下文:
Context——ThreadForge自己的线程局部变量- OpenTelemetry —— 外部 Tracing 系统的当前 span
8. RetryExecutor:重试为什么必须感知取消
RetryExecutor 的核心循环是这样的:
while (true) {
token.throwIfCancelled();
try {
return callable.call();
} catch (InterruptedException e) {
throw e; // 中断直接透传,不重试
} catch (CancelledException e) {
throw e; // 取消直接透传,不重试
} catch (Throwable failure) {
if (!retryPolicy.allowsRetry(attempt, failure)) {
// 不再重试,把之前所有失败附加到 suppressed
if (previousFailures != null) {
for (Throwable prev : previousFailures) {
if (prev != failure) failure.addSuppressed(prev);
}
}
throw failure;
}
previousFailures.add(failure);
sleepBeforeRetry(retryPolicy.nextDelay(attempt, failure), token);
attempt++;
}
}
- 中断和取消直接透传——不是业务失败,不能重试,框架必须立即响应
- 每次循环先检查 token——上次执行成功后,下一轮也要看 scope 是否已取消
- sleep 分片——每 100ms 醒来检查一次取消
sleepBeforeRetry(...) 实现:
long remainingMillis = delay.toMillis();
while (remainingMillis > 0L) {
token.throwIfCancelled();
long chunk = Math.min(remainingMillis, 100L);
Thread.sleep(chunk);
remainingMillis -= chunk;
}
如果最终还是失败,前面每次失败的异常会累加到最终异常的 suppressed 里——排查时能看到完整的失败历史,不是只有最后一次的错误信息。
分片 sleep 是关键:假设重试要等 5 秒,整段睡过去,中途收到取消信号就会反应慢半拍。分成 100ms 的小段,每段醒来检查一次取消,scope 一喊停就能更快响应。
9. 两层时间预算:scope deadline + task timeout
9.1 scope 级 deadline
withDeadline(Duration)
配置整个 ThreadScope 的截止时间。到达后:
deadlineTriggered = true,token.cancel()- 作用域内所有任务收到取消信号
- 后续等待抛出
ScopeTimeoutException
9.2 task 级 timeout
scope.submit("rpc-a", callable, Duration.ofMillis(200));
给单个任务设置独立预算。内部注册一个超时任务:
if (task.toCompletableFuture().completeExceptionally(timeoutException)) {
task.markFailed();
task.interruptRunner();
safeHookFailure(info, timeoutException, timeout.toNanos());
}
只影响当前任务:标记失败、中断执行线程、触发 hook。不影响 scope 内其他任务。
| 类型 | 作用范围 | 触发后影响 | 适合场景 |
|---|---|---|---|
| scope deadline | 整个 ThreadScope | 取消整个作用域 | 一个请求、一个批处理的总预算 |
| task timeout | 单个 Task | 当前任务失败并中断 runner | 某个 RPC、IO 的独立预算 |
一个管全局,一个管局部。
10. Hook 和 Metrics:观测内嵌在执行链路里
ThreadHook 是 interface,4 个方法全是 default——只覆写需要的:
default void onStart(TaskInfo info) {}
default void onSuccess(TaskInfo info, Duration duration) {}
default void onFailure(TaskInfo info, Throwable error, Duration duration) {}
default void onCancel(TaskInfo info, Duration duration) {}
通过 andThen(...) 组合多个 hook,每个回调内部独立 catch 异常——前一个抛异常不阻止后续执行:
ThreadHook combined = hook1.andThen(hook2).andThen(hook3);
元信息来自 TaskInfo:
new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name())
内置指标由 ScopeMetrics 维护,LongAdder + AtomicLong,竞争开销低:
started / succeeded / failed / cancelled
totalDurationNanos / maxDurationNanos
观测和执行事件发生在同一时刻:
safeHookStart(info); metrics.recordStart();
// ... 执行 ...
safeHookSuccess(...) metrics.recordTerminal(Task.State.SUCCESS, ...)
任务走到哪个生命周期节点,hook 和 metrics 就跟到哪里。
写在最后
最近换工作,刚进了一个新团队。上家的节奏比较正常,现在几乎每天加班赶进度。
说不清忙和闲哪个更好。
职位又往上走了一步,压力也跟着翻了一倍。
向上对齐、向下兜底,还是技术最纯粹。没有汇报,没有上下级,简单而美好。
这一篇是 ThreadForge 源码解读的执行链路部分,下一篇继续,解读高阶编排怎么在一个 scope 里协同工作。 也算是给整个系列收官。
项目地址:github.com/wuuJiawei/T…,欢迎提 issue,也欢迎 star。
相关文章
- DeepSeek企业版账号权限怎么设置?团队协作3种角色配置 06-14
- 燕云十六声束罪阁镇守首领怎样 06-14
- 物华弥新二周年活动奖励都有哪些 06-14
- 梦幻西游属性点切换该找谁 06-14
- 王者荣耀猴子怎样出装能快速打出四棍 06-14
- 宝可梦pokopia全部宝可梦的特性喜好一览表 06-14