vLLM框架代码走读02(Core侧)
本文走读 vLLM V1 Core 侧的一次推理迭代如何发生,覆盖这些关零本文走读 vLLM V1 Core 侧的一次推理迭代如何发生,覆盖这些关键类:
EngineCore/EngineCoreProc:Core 侧主循环与进程封装。Scheduler:请求队列、KV block、token budget 的调度者。Executor:Core 与 Worker 之间的执行抽象。Worker:设备侧生命周期与 pipeline-parallel 桥接。GPUModelRunner:把调度结果变成 GPU 输入、执行模型 forward、采样。CachedRequestState:Worker/GPUModelRunner 内部缓存的请求状态。
SchedulerOutput 在本文中只作为 Scheduler 与 ModelRunner 之间的调度契约来使用,不展开它的字段构造、消费、更新全细节。
零、总览:一次 Core 侧推理迭代
Core 侧推理最短的描述是:Scheduler 决定本轮每个请求算多少 token、占哪些 KV block;Executor/Worker/GPUModelRunner 按这个计划跑模型;采样输出再回到 Scheduler 更新请求状态。
@startuml
actor Frontend
participant "EngineCoreProc" as Proc
participant "EngineCore" as Core
participant "Scheduler" as Sched
participant "Executor" as Exec
participant "Worker" as Worker
participant "GPUModelRunner" as Runner
Frontend -> Proc: ADD / ABORT / UTILITY
Proc -> Core: preprocess_add_request()
Core -> Sched: add_request(Request)
loop engine busy loop
Core -> Sched: schedule()
Sched --> Core: SchedulerOutput
Core -> Exec: execute_model(SchedulerOutput)
Exec -> Worker: execute_model(SchedulerOutput)
Worker -> Runner: execute_model(SchedulerOutput, intermediate_tensors?)
Runner -> Runner: _update_states()
Runner -> Runner: _prepare_inputs()
Runner -> Runner: model forward
Runner --> Worker: None or ModelRunnerOutput
alt execute_model returned None
Core -> Sched: get_grammar_bitmask(SchedulerOutput)
Core -> Exec: sample_tokens(grammar_output)
Exec -> Worker: sample_tokens(grammar_output)
Worker -> Runner: sample_tokens(grammar_output)
Runner --> Core: ModelRunnerOutput
end
Core -> Sched: update_from_output(SchedulerOutput, ModelRunnerOutput)
Sched --> Proc: EngineCoreOutputs
Proc --> Frontend: output queue / ZMQ
end
@enduml
对应到代码,EngineCore.step() 就是这个闭环最凝练的版本:
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
...
scheduler_output = self.scheduler.schedule()
future = self.model_executor.execute_model(scheduler_output, non_block=True)
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
):
model_output = future.result()
if model_output is None:
model_output = self.model_executor.sample_tokens(grammar_output)
self._process_aborts_queue()
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)
return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0
这段代码体现了 V1 Core 的三个边界:
- 调度边界:
Scheduler.schedule()只产出本轮计划,不直接执行模型。 - 执行边界:
Executor.execute_model()可能同步或异步地触发 Worker 侧执行。 - 回灌边界:
Scheduler.update_from_output()消费模型输出,更新请求、KV、停止条件与前端输出。
一、EngineCoreProc:把 Core 放进后台进程
EngineCore 是内层逻辑,EngineCoreProc 则把它包装成后台进程,负责 ZMQ IO、输入输出队列、shutdown、数据并行协调等外围事务。
它的 busy loop 很克制:先从输入队列取请求,直到有活干;再执行一次 engine step;最后把 EngineCoreOutputs 放入输出队列。
def run_busy_loop(self):
"""Core busy loop of the EngineCore."""
while self._handle_shutdown():
# 1) Poll the input queue until there is work to do.
self._process_input_queue()
# 2) Step the engine core and return the outputs.
self._process_engine_step()
raise SystemExit
def _process_engine_step(self) -> bool:
"""Called only when there are unfinished local requests."""
outputs, model_executed = self.step_fn()
for output in outputs.items() if outputs else ():
self.output_queue.put_nowait(output)
self.post_step(model_executed)
if not model_executed and self.scheduler.has_unfinished_requests():
time.sleep(0.001)
return model_executed
输入线程会把 ADD 请求先预处理成 Core 内部的 Request,再放进 input_queue。这个转换发生在 EngineCore.preprocess_add_request():多模态 feature 可能会走 receiver cache,随后由 Request.from_engine_core_request() 生成调度侧对象。
def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, int]:
...
req = Request.from_engine_core_request(request, self.request_block_hasher)
if req.use_structured_output:
self.structured_output_manager.grammar_init(req)
return req, request.current_wave
Core 初始化时会先创建 Executor,再根据模型和显存配置初始化 KV cache,最后创建 Scheduler:
def __init__(...):
...
self.model_executor = executor_class(vllm_config)
...
kv_cache_config = self._initialize_kv_caches(vllm_config)
self.structured_output_manager = StructuredOutputManager(vllm_config)
Scheduler = vllm_config.scheduler_config.get_scheduler_cls()
...
self.scheduler: SchedulerInterface = Scheduler(
vllm_config=vllm_config,
kv_cache_config=kv_cache_config,
structured_output_manager=self.structured_output_manager,
include_finished_set=include_finished_set,
log_stats=self.log_stats,
block_size=scheduler_block_size,
)
...
self.step_fn = (
self.step if self.batch_queue is None else self.step_with_batch_queue
)
在普通路径下,step() 一轮一轮推进。开启 pipeline parallel 或 async scheduling 时,step_with_batch_queue() 会维护一个 batch queue,让“调度下一批”和“等待上一批输出”重叠起来,减少 pipeline bubble。
二、Executor:Core 与 Worker 的执行抽象
Executor 是 Core 侧看到的模型执行接口。它屏蔽了单进程、multiprocessing、Ray 等后端差异。对 EngineCore 来说,只需要调用 execute_model()、sample_tokens()、take_draft_token_ids() 等方法。
抽象类中的默认 execute_model() 通过 collective_rpc 调 Worker 方法,并取第一个返回值:
def execute_model(
self, scheduler_output: SchedulerOutput, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
output = self.collective_rpc(
"execute_model", args=(scheduler_output,), non_block=non_block
)
return output[0]
单进程 executor 更直接:一个 WorkerWrapperBase 包住本进程内的 Worker。非阻塞模式下,如果 Worker 返回 AsyncModelRunnerOutput,executor 可以把 get_output() 放到后台线程里做。
def execute_model(
self, scheduler_output: SchedulerOutput, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
output = self.collective_rpc(
"execute_model",
args=(scheduler_output,),
non_block=non_block,
single_value=True,
)
if non_block and output.done():
output.result()
return output
Multiproc/Ray 后端也遵循同一个语义,只是执行地点不同:multiproc 会把 RPC 发给 worker 进程,Ray 会走 compiled DAG。Core 侧不需要关心这些差异。
三、Worker:设备侧生命周期与 PP 桥
GPU Worker 负责 CUDA device 初始化、分布式环境初始化、模型加载、KV cache 初始化、warmup/cudagraph capture,以及每轮调用 model runner。
Worker 初始化 device 时会根据环境变量选择 V1 或 V2 model runner。当前仓库默认 VLLM_USE_V2_MODEL_RUNNER=False,而本文重点覆盖的 CachedRequestState 属于 V1 vllm/v1/worker/gpu_model_runner.py 路径。
def init_device(self):
...
self.use_v2_model_runner = envs.VLLM_USE_V2_MODEL_RUNNER
...
if self.use_v2_model_runner:
from vllm.v1.worker.gpu.model_runner import (
GPUModelRunner as GPUModelRunnerV2,
)
# HACK(woosuk): This is a temporary fix to avoid type errors.
self.model_runner: GPUModelRunner = GPUModelRunnerV2( # type: ignore
self.vllm_config, self.device
)
else:
from vllm.v1.worker.gpu_model_runner import (
GPUModelRunner as GPUModelRunnerV1,
)
self.model_runner = GPUModelRunnerV1(self.vllm_config, self.device)
执行模型时,Worker 还承担 pipeline parallel 的收发。非首个 PP rank 会先接收上一 stage 的 IntermediateTensors;非最后一个 PP rank 在 runner forward 后把中间张量发给下一 stage。
def execute_model(
self, scheduler_output: "SchedulerOutput"
) -> ModelRunnerOutput | AsyncModelRunnerOutput | None:
...
forward_pass = scheduler_output.total_num_scheduled_tokens > 0
...
if forward_pass and not get_pp_group().is_first_rank:
tensor_dict, comm_handles, comm_postprocess = (
get_pp_group().irecv_tensor_dict(...)
)
intermediate_tensors = AsyncIntermediateTensors(...)
with self.annotate_profile(scheduler_output):
output = self.model_runner.execute_model(
scheduler_output, intermediate_tensors
)
if isinstance(output, ModelRunnerOutput | AsyncModelRunnerOutput | NoneType):
return output
assert isinstance(output, IntermediateTensors)
...
self._pp_send_work = get_pp_group().isend_tensor_dict(...)
return None
因此,Worker 是“Core 调度契约”和“GPU runner 实际执行”之间的设备桥;真正把请求状态拼成张量的是 GPUModelRunner。
四、Scheduler:没有固定 prefill/decode 阶段的 token 调度
Scheduler 的核心思想写在 schedule() 开头:它不是先做一批 prefill,再做一批 decode,而是用 num_computed_tokens 追赶 num_tokens_with_spec。
def schedule(self) -> SchedulerOutput:
# NOTE(woosuk) on the scheduling algorithm:
# There's no "decoding phase" nor "prefill phase" in the scheduler.
# Each request just has the num_computed_tokens and
# num_tokens_with_spec. num_tokens_with_spec =
# len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
# At each step, the scheduler tries to assign tokens to the requests
# so that each request's num_computed_tokens can catch up its
# num_tokens_with_spec. This is general enough to cover
# chunked prefills, prefix caching, speculative decoding,
# and the "jump decoding" optimization in the future.
...
这个视角很关键。对 Scheduler 来说:
- prompt 还没算完,是
num_computed_tokens < prompt_len。 - decode 需要新 token,是 sampled token 已进入
all_token_ids,但还没被模型计算。 - spec decode 是
spec_token_ids临时扩展了num_tokens_with_spec。 - chunked prefill 只是本轮 token budget 不够,一次只追赶一段。
4.1 Scheduler 内部状态
Scheduler 维护三类请求容器:
requests: req_id -> Request # 全局请求表
waiting / skipped_waiting # 等待进入运行态
running # 已占用调度资源、可持续推进
finished_req_ids # 本轮需要通知 worker 清理的请求
请求生命周期可以简化为:
WAITING
|
| schedule waiting request + allocate KV slots
v
RUNNING
|
| output reaches stop / length cap / abort
v
FINISHED_*
RUNNING --KV 不足/优先级抢占--> PREEMPTED --> WAITING
WAITING --远端 KV 未就绪-------> WAITING_FOR_REMOTE_KVS
WAITING --结构化 grammar 未就绪-> WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR
新增请求只入队,不立刻执行:
def add_request(self, request: Request) -> None:
existing = self.requests.get(request.request_id)
if existing is not None:
...
else:
if request.resumable:
request.streaming_queue = deque()
self._enqueue_waiting_request(request)
self.requests[request.request_id] = request
if self.log_stats:
request.record_event(EngineCoreEventType.QUEUED)
4.2 调度第一段:优先推进 RUNNING 请求
schedule() 先处理 running。这保证已经在持续生成的请求优先得到推进,也使 decode 场景下每个请求通常每轮只补一个 token。
核心计算是:
num_new_tokens = (
request.num_tokens_with_spec
+ request.num_output_placeholders
- request.num_computed_tokens
)
if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
num_new_tokens = self.scheduler_config.long_prefill_token_threshold
num_new_tokens = min(num_new_tokens, token_budget)
num_new_tokens = min(
num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
)
含义是:本轮最多调度“还没被模型计算的 token 数”,再受限于 long prefill threshold、全局 token budget 和 max model len。
随后 Scheduler 向 KV cache manager 申请 slot:
while True:
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_lookahead_tokens=self.num_lookahead_tokens,
)
if new_blocks is not None:
break
if self.policy == SchedulingPolicy.PRIORITY:
preempted_req = max(
self.running,
key=lambda r: (r.priority, r.arrival_time),
)
self.running.remove(preempted_req)
...
else:
preempted_req = self.running.pop()
self._preempt_request(preempted_req, scheduled_timestamp)
preempted_reqs.append(preempted_req)
if preempted_req == request:
break
KV 空间不足时会抢占一个 running request。抢占会释放 KV block,把请求状态改为 PREEMPTED,并把它放回 waiting 队列:
def _preempt_request(self, request: Request, timestamp: float) -> None:
...
self.kv_cache_manager.free(request)
self.encoder_cache_manager.free(request)
request.status = RequestStatus.PREEMPTED
request.num_computed_tokens = 0
if request.spec_token_ids:
request.spec_token_ids = []
request.num_preemptions += 1
...
self.waiting.prepend_request(request)
调度成功后,Scheduler 记录本轮每个 request 的 token 数和新增 block:
scheduled_running_reqs.append(request)
request_id = request.request_id
req_to_new_blocks[request_id] = new_blocks
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens
4.3 调度第二段:从 WAITING 提升新请求或恢复请求
如果本轮没有发生 preemption 且 scheduler 未暂停,才会继续调度 waiting 请求。这里会处理 prefix cache、远端 KV、encoder input、LoRA 约束、chunked prefill、KV 分配等。
prefix cache 命中发生在请求第一次被调度时:
if request.num_computed_tokens == 0:
new_computed_blocks, num_new_local_computed_tokens = (
self.kv_cache_manager.get_computed_blocks(request)
)
if self.connector is not None:
ext_tokens, load_kv_async = (
self.connector.get_num_new_matched_tokens(
request, num_new_local_computed_tokens
)
)
...
num_computed_tokens = (
num_new_local_computed_tokens + num_external_computed_tokens
)
assert num_computed_tokens <= request.num_tokens
else:
new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
num_new_local_computed_tokens = 0
num_computed_tokens = request.num_computed_tokens
然后计算还需要调度多少 token:
if load_kv_async:
assert num_external_computed_tokens > 0
num_new_tokens = 0
else:
num_new_tokens = request.num_tokens - num_computed_tokens
threshold = self.scheduler_config.long_prefill_token_threshold
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold
if (
not self.scheduler_config.enable_chunked_prefill
and num_new_tokens > token_budget
):
break
num_new_tokens = min(num_new_tokens, token_budget)
assert num_new_tokens > 0
这里能看到 chunked prefill 的开关含义:如果不允许 chunk,而当前 prompt 剩余 token 超过 budget,就不调度这个请求;如果允许,就按 budget 切一段。
最后申请 KV slot 并把 waiting 请求变成 running:
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_new_computed_tokens=num_new_local_computed_tokens,
new_computed_blocks=new_computed_blocks,
num_lookahead_tokens=effective_lookahead_tokens,
num_external_computed_tokens=num_external_computed_tokens,
delay_cache_blocks=load_kv_async,
num_encoder_tokens=num_encoder_tokens,
)
if new_blocks is None:
if request.has_encoder_inputs:
self.encoder_cache_manager.free(request)
break
...
self.running.append(request)
...
req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(request_id)
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_computed_tokens
4.4 SchedulerOutput:本轮计划的压缩表达
Scheduler 结尾会把本轮计划压成 SchedulerOutput。本文不拆它,但要把它的角色说清楚:
SchedulerOutput =
新请求完整资料 scheduled_new_reqs
老请求增量资料 scheduled_cached_reqs
每个请求本轮 token 数 num_scheduled_tokens
本轮总 token 数 total_num_scheduled_tokens
spec decode 草稿 token scheduled_spec_decode_tokens
encoder 输入调度 scheduled_encoder_inputs
finished/preempted 通知 finished_req_ids / preempted_req_ids
KV/EC connector metadata kv_connector_metadata / ec_connector_metadata
构造位置如下:
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=cached_reqs_data,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
scheduled_encoder_inputs=scheduled_encoder_inputs,
num_common_prefix_blocks=num_common_prefix_blocks,
preempted_req_ids={req.request_id for req in preempted_reqs},
finished_req_ids=self.finished_req_ids,
free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
new_block_ids_to_zero=new_block_ids_to_zero,
)
构造完成后,Scheduler 立即推进自己的 num_computed_tokens。这一步容易忽略:调度输出里需要保留“调度前”的 token 位置供 runner 取 input ids,但 Scheduler 内部又要允许下一轮继续调度同一个 prefill chunk。
def _update_after_schedule(self, scheduler_output: SchedulerOutput) -> None:
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
for req_id, num_scheduled_token in num_scheduled_tokens.items():
request = self.requests[req_id]
request.num_computed_tokens += num_scheduled_token
request.is_prefill_chunk = request.num_computed_tokens < (
request.num_tokens + request.num_output_placeholders
)
scheduler_output.has_structured_output_requests |= (
request.use_structured_output and not request.is_prefill_chunk
)
...
self.finished_req_ids = set()
可以把这个设计理解成:
SchedulerOutput 记录“本轮从哪里开始算、算多少”
Scheduler 内部状态则提前移动到“本轮算完后的位置”
如果后续 spec token 被拒绝,再在 update_from_output() 回退
4.5 回灌:update_from_output()
模型输出回来后,Scheduler 对每个本轮调度的请求做三件事:
- 取出采样 token、logprobs、pooling output 等。
- 把 token 追加到
Request,检查 stop 条件。 - 对完成请求释放 KV / encoder cache,并生成给前端的
EngineCoreOutput。
核心片段如下:
for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
...
request = self.requests.get(req_id)
if request is None or request.is_finished():
continue
req_index = model_runner_output.req_id_to_index[req_id]
generated_token_ids = (
sampled_token_ids[req_index] if sampled_token_ids else []
)
...
if new_token_ids:
new_token_ids, stopped = self._update_request_with_output(
request, new_token_ids
)
elif request.pooling_params and pooler_output is not None:
request.status = RequestStatus.FINISHED_STOPPED
stopped = True
if stopped:
finish_reason = request.get_finished_reason()
finished = self._handle_stopped_request(request)
if finished:
kv_transfer_params = self._free_request(request)
追加 token 与停止判断集中在 _update_request_with_output():
def _update_request_with_output(
self, request: Request, new_token_ids: list[int]
) -> tuple[list[int], bool]:
stopped = False
for num_new, output_token_id in enumerate(new_token_ids, 1):
request.append_output_token_ids(output_token_id)
stopped = check_stop(request, self.max_model_len)
if stopped:
del new_token_ids[num_new:]
break
return new_token_ids, stopped
五、CachedRequestState:Worker 侧的请求镜像
Scheduler 的 Request 属于 Core 侧调度状态;GPUModelRunner 不能每轮都从 Core 全量拿请求资料,所以 Worker 侧有一份缓存状态 CachedRequestState。
它包含 prompt、mm feature、sampling params、block table、已计算 token 数、已输出 token 等:
@dataclass
class CachedRequestState:
req_id: str
prompt_token_ids: list[int] | None
mm_features: list[MultiModalFeatureSpec]
sampling_params: SamplingParams | None
generator: torch.Generator | None
block_ids: tuple[list[int], ...]
num_computed_tokens: int
output_token_ids: list[int]
mrope_positions: torch.Tensor | None = None
mrope_position_delta: int | None = None
...
def __post_init__(self):
self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
self.prompt_token_ids, self.prompt_embeds
)
它和 InputBatch 是配套的:
GPUModelRunner.requests[req_id] -> CachedRequestState # 请求级缓存
GPUModelRunner.input_batch # 批级常驻张量/表
InputBatch.add_request() 会把请求写入常驻 batch,包括 token ids、block table、采样参数、LoRA 映射等:
def add_request(self, request: "CachedRequestState") -> int:
req_index = self._register_add_request(request)
...
self.req_id_to_index[req_id] = req_index
num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
request.prompt_token_ids, request.prompt_embeds
)
self.num_prompt_tokens[req_index] = num_prompt_tokens
...
self.token_ids_cpu[req_index, start_idx:end_idx] = request.output_token_ids
self.num_tokens_no_spec[req_index] = request.num_tokens
self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
self.block_table.add_row(request.block_ids, req_index)
...
如果请求完成或本轮没有被调度,runner 会从 persistent batch 中移除它;如果只是暂时未调度,CachedRequestState 仍保留,下一轮再加回 InputBatch。
这个设计减少了 EngineCore 到 Worker 的通信量:新请求发全量,老请求发增量。
六、GPUModelRunner.execute_model:从 SchedulerOutput 到 forward
GPUModelRunner.execute_model() 是设备侧主流程。可以把它分成七步:
1. 处理 connector / ngram 等前置状态
2. _update_states(): 合并 SchedulerOutput 到 CachedRequestState + InputBatch
3. _prepare_inputs(): 生成 input_ids、positions、slot mapping、logits_indices
4. 构造 attention metadata / cudagraph batch descriptor
5. _preprocess(): 多模态 embedding、encoder-decoder、PP intermediate tensors
6. _model_forward(): 调模型 forward
7. last PP rank 计算 logits,把状态留给 sample_tokens()
主函数中从调度结果到 forward 的关键部分如下:
def execute_model(
self,
scheduler_output: "SchedulerOutput",
intermediate_tensors: IntermediateTensors | None = None,
) -> ModelRunnerOutput | AsyncModelRunnerOutput | IntermediateTensors | None:
...
num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
with (
record_function_or_nullcontext("gpu_model_runner: preprocess"),
self.synchronize_input_prep(),
):
deferred_state_corrections_fn = self._update_states(scheduler_output)
...
if not num_scheduled_tokens:
...
return EMPTY_MODEL_RUNNER_OUTPUT
num_reqs = self.input_batch.num_reqs
req_ids = self.input_batch.req_ids
tokens = [scheduler_output.num_scheduled_tokens[i] for i in req_ids]
num_scheduled_tokens_np = np.array(tokens, dtype=np.int32)
...
logits_indices, spec_decode_metadata = self._prepare_inputs(
scheduler_output,
num_scheduled_tokens_np,
)
...
attn_metadata, spec_decode_common_attn_metadata = (
self._build_attention_metadata(...)
)
...
(
input_ids,
inputs_embeds,
positions,
intermediate_tensors,
model_kwargs,
ec_connector_output,
) = self._preprocess(
scheduler_output, num_tokens_padded, intermediate_tensors
)
6.1 第一步:_update_states() 合并调度状态
_update_states() 先清理完成请求,再处理本轮不再调度的 batch 行,再加入新请求和恢复请求,最后刷新采样 metadata。
清理完成请求:
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
for req_id in scheduler_output.finished_req_ids:
self.requests.pop(req_id, None)
self.num_prompt_logprobs.pop(req_id, None)
...
for req_id in scheduler_output.finished_req_ids:
self.input_batch.remove_request(req_id)
移除本轮未调度请求的 persistent batch 行:
scheduled_req_ids = scheduler_output.num_scheduled_tokens.keys()
cached_req_ids = self.input_batch.req_id_to_index.keys()
resumed_req_ids = scheduler_output.scheduled_cached_reqs.resumed_req_ids
unscheduled_req_ids = cached_req_ids - (scheduled_req_ids - resumed_req_ids)
...
for req_id in unscheduled_req_ids:
self.input_batch.remove_request(req_id)
添加新请求时,runner 创建 CachedRequestState,并初始化随机 generator、pooling 参数、prompt logprobs、M-RoPE/XD-RoPE 等状态:
for new_req_data in scheduler_output.scheduled_new_reqs:
req_id = new_req_data.req_id
...
req_state = CachedRequestState(
req_id=req_id,
prompt_token_ids=new_req_data.prompt_token_ids,
prompt_embeds=new_req_data.prompt_embeds,
mm_features=new_req_data.mm_features,
sampling_params=sampling_params,
pooling_params=pooling_params,
generator=generator,
block_ids=new_req_data.block_ids,
num_computed_tokens=new_req_data.num_computed_tokens,
output_token_ids=[],
lora_request=new_req_data.lora_request,
)
self.requests[req_id] = req_state
...
reqs_to_add.append(req_state)
更新老请求时,会同步 num_computed_tokens、追加新 block、处理 PP 下的新 token、处理 spec token:
for i, req_id in enumerate(req_data.req_ids):
req_state = self.requests[req_id]
num_computed_tokens = req_data.num_computed_tokens[i]
new_block_ids = req_data.new_block_ids[i]
resumed_from_preemption = req_id in req_data.resumed_req_ids
...
req_state.num_computed_tokens = num_computed_tokens
...
if not resumed_from_preemption:
if new_block_ids is not None:
for block_ids, new_ids in zip(req_state.block_ids, new_block_ids):
block_ids.extend(new_ids)
else:
assert req_index is None
assert new_block_ids is not None
req_state.block_ids = new_block_ids
...
self.input_batch.num_computed_tokens_cpu[req_index] = num_computed_tokens
if new_block_ids is not None:
self.input_batch.block_table.append_row(new_block_ids, req_index)
...
self.input_batch.update_req_spec_token_ids(req_state, scheduled_spec_tokens)
最后把待加入请求写入 InputBatch,压缩空洞,并刷新 sampling metadata:
for request in reqs_to_add:
self.input_batch.add_request(request)
self.input_batch.update_req_spec_token_ids(request, scheduled_spec_tokens)
self.input_batch.condense()
self._may_reorder_batch(scheduler_output)
self.input_batch.refresh_metadata()
6.2 第二步:_prepare_inputs() 拼本轮 GPU 输入
_prepare_inputs() 的任务是把“每个 request 本轮算多少 token”转成平铺的 GPU batch。
假设本轮三个请求分别调度 [2, 5, 3] 个 token:
req_indices:
[0, 0, 1, 1, 1, 1, 1, 2, 2, 2]
query_pos:
[0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
positions:
input_batch.num_computed_tokens_cpu[req_indices] + query_pos
源码中的这段正是上述变换:
def _prepare_inputs(
self,
scheduler_output: "SchedulerOutput",
num_scheduled_tokens: np.ndarray,
) -> tuple[torch.Tensor, SpecDecodeMetadata | None]:
...
self.input_batch.block_table.commit_block_table(num_reqs)
req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)
cu_num_tokens = self._get_cumsum_and_arange(
num_scheduled_tokens, self.query_pos.np
)
positions_np = (
self.input_batch.num_computed_tokens_cpu[req_indices]
+ self.query_pos.np[: cu_num_tokens[-1]]
)
然后它从 InputBatch.token_ids_cpu 中按位置取出本轮 input ids:
token_indices = (
positions_np + req_indices * self.input_batch.token_ids_cpu.shape[1]
)
token_indices_tensor = torch.from_numpy(token_indices)
torch.index_select(
self.input_batch.token_ids_cpu_tensor.flatten(),
0,
token_indices_tensor,
out=self.input_ids.cpu[:total_num_scheduled_tokens],
)
随后准备 attention metadata 需要的 query start、seq len、slot mapping:
self.query_start_loc.np[0] = 0
self.query_start_loc.np[1 : num_reqs + 1] = cu_num_tokens
self.query_start_loc.np[num_reqs + 1 :].fill(cu_num_tokens[-1])
self.query_start_loc.copy_to_gpu()
query_start_loc = self.query_start_loc.gpu[: num_reqs + 1]
...
self.input_batch.block_table.compute_slot_mapping(
num_reqs,
self.query_start_loc.gpu[: num_reqs + 1],
self.positions[:total_num_scheduled_tokens],
)
...
self._prepare_input_ids(
scheduler_output,
num_reqs,
total_num_scheduled_tokens,
cu_num_tokens,
)
slot_mapping 是“本轮每个 token 写入 KV cache 的哪个 slot”的设备侧映射。Scheduler 分配 block,runner 根据 block table 和 token position 计算 slot。
最后确定哪些 hidden states 需要拿来算 logits:
if not use_spec_decode:
logits_indices = query_start_loc[1:] - 1
spec_decode_metadata = None
num_sampled_tokens = np.ones(num_reqs, dtype=np.int32)
else:
...
spec_decode_metadata = self._calc_spec_decode_metadata(
num_draft_tokens, cu_num_tokens
)
logits_indices = spec_decode_metadata.logits_indices
num_sampled_tokens = num_draft_tokens + 1
非 spec decode 下,每个 request 只采样本轮最后一个 token 位置,所以 logits_indices = query_start_loc[1:] - 1。
6.3 第三步:_preprocess() 处理模型输入形态
_preprocess() 把 input_ids、inputs_embeds、positions、intermediate_tensors、model_kwargs 准备好。纯文本模型走 token ids;多模态模型会先执行 encoder 并把 embedding 填进 inputs_embeds;encoder-decoder 模型会额外传 encoder_outputs。
纯文本路径最简单:
else:
# For text-only models, we use token ids as input.
input_ids = self.input_ids.gpu[:num_input_tokens]
inputs_embeds = None
model_kwargs = self._init_model_kwargs()
多模态路径会先收集多模态 embedding:
if self.supports_mm_inputs and is_first_rank and not is_encoder_decoder:
with self.maybe_get_ec_connector_output(
scheduler_output,
encoder_cache=self.encoder_cache,
) as ec_connector_output:
self._execute_mm_encoder(scheduler_output)
mm_embeds, is_mm_embed = self._gather_mm_embeddings(scheduler_output)
inputs_embeds_scheduled = self.model.embed_input_ids(
self.input_ids.gpu[:num_scheduled_tokens],
multimodal_embeddings=mm_embeds,
is_multimodal=is_mm_embed,
)
self.inputs_embeds.gpu[:num_scheduled_tokens].copy_(inputs_embeds_scheduled)
input_ids, inputs_embeds = self._prepare_mm_inputs(num_input_tokens)
6.4 第四步:forward 与 logits
runner 进入 set_forward_context(),把 attention metadata、cudagraph mode、batch descriptor、slot mapping 等设置到 forward context,再调用模型:
with (
set_forward_context(
attn_metadata,
self.vllm_config,
num_tokens=num_tokens_padded,
num_tokens_across_dp=num_tokens_across_dp,
cudagraph_runtime_mode=cudagraph_mode,
batch_descriptor=batch_desc,
ubatch_slices=ubatch_slices_padded,
slot_mapping=slot_mappings,
skip_compiled=has_encoder_input,
),
record_function_or_nullcontext("gpu_model_runner: forward"),
self.maybe_get_kv_connector_output(
scheduler_output,
defer_finalize=defer_kv_connector_finalize,
) as kv_connector_output,
):
model_output = self._model_forward(
input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
**model_kwargs,
)
_model_forward() 本身只是模型调用薄封装:
def _model_forward(
self,
input_ids: torch.Tensor | None = None,
positions: torch.Tensor | None = None,
intermediate_tensors: IntermediateTensors | None = None,
inputs_embeds: torch.Tensor | None = None,
**model_kwargs: dict[str, Any],
) -> Any:
return self.model(
input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
**model_kwargs,
)
以 llama.py 为例,模型 forward 在首个 PP rank 做 embedding,中间层逐层 forward,非最后 rank 返回 IntermediateTensors,最后 rank 做 norm 并返回 hidden states:
def forward(
self,
input_ids: torch.Tensor | None,
positions: torch.Tensor,
intermediate_tensors: IntermediateTensors | None,
inputs_embeds: torch.Tensor | None = None,
**extra_layer_kwargs,
) -> torch.Tensor | IntermediateTensors | tuple[torch.Tensor, list[torch.Tensor]]:
if get_pp_group().is_first_rank:
if inputs_embeds is not None:
hidden_states = inputs_embeds
else:
hidden_states = self.embed_input_ids(input_ids)
residual = None
else:
assert intermediate_tensors is not None
hidden_states = intermediate_tensors["hidden_states"]
residual = intermediate_tensors["residual"]
for idx, layer in enumerate(
islice(self.layers, self.start_layer, self.end_layer)
):
hidden_states, residual = layer(
positions, hidden_states, residual, **extra_layer_kwargs
)
...
if not get_pp_group().is_last_rank:
return IntermediateTensors(
{"hidden_states": hidden_states, "residual": residual}
)
hidden_states, _ = self.norm(hidden_states, residual)
...
return hidden_states
最后一个 PP rank 会按 logits_indices 取需要采样的位置,计算 logits:
if not get_pp_group().is_last_rank:
assert isinstance(hidden_states, IntermediateTensors)
hidden_states.kv_connector_output = kv_connector_output
self.kv_connector_output = kv_connector_output
return hidden_states
if self.is_pooling_model:
return self._pool(...)
sample_hidden_states = hidden_states[logits_indices]
logits = self.model.compute_logits(sample_hidden_states)
Llama 的 logits 计算也很薄:
def compute_logits(
self,
hidden_states: torch.Tensor,
) -> torch.Tensor | None:
logits = self.logits_processor(self.lm_head, hidden_states)
return logits
6.5 第五步:execute_model 为什么常常返回 None
普通生成模型在 forward 后并不马上返回 ModelRunnerOutput,而是把 logits、hidden states、spec metadata 等暂存到 execute_model_state,然后返回 None:
self.execute_model_state = ExecuteModelState(
scheduler_output,
logits,
spec_decode_metadata,
spec_decode_common_attn_metadata,
hidden_states,
sample_hidden_states,
aux_hidden_states,
ec_connector_output,
cudagraph_stats,
slot_mappings,
)
self.kv_connector_output = kv_connector_output
...
return None
这样做是为了让 Core 有机会在 forward 之后、sample 之前,从 Scheduler 获取 structured output 的 grammar bitmask:
execute_model() -> forward 得到 logits -> 返回 None
EngineCore -> Scheduler.get_grammar_bitmask()
sample_tokens(grammar_output) -> 对 logits 套 grammar mask -> sample
这也是 WorkerBase.execute_model() 文档里说“如果返回 None,必须立即调用 sample_tokens”的原因。
七、GPUModelRunner.sample_tokens:采样、回写 batch、生成输出
sample_tokens() 消费 execute_model_state。先应用 grammar bitmask,再调用 sampler:
if grammar_output is not None:
apply_grammar_bitmask(
scheduler_output, grammar_output, self.input_batch, logits
)
with record_function_or_nullcontext("gpu_model_runner: sample"):
sampler_output = self._sample(logits, spec_decode_metadata)
_sample() 会用 InputBatch.sampling_metadata 调 sampler;spec decode 时走 rejection sampler:
def _sample(
self,
logits: torch.Tensor | None,
spec_decode_metadata: SpecDecodeMetadata | None,
) -> SamplerOutput:
sampling_metadata = self.input_batch.sampling_metadata
self.input_batch.update_async_output_token_ids()
if spec_decode_metadata is None:
return self.sampler(
logits=logits,
sampling_metadata=sampling_metadata,
)
...
sampler_output = self.rejection_sampler(
spec_decode_metadata,
None,
logits,
sampling_metadata,
)
return sampler_output
采样后 _bookkeeping_sync() 会做设备到 CPU 的结果整理,并把 sampled token 缓存回 InputBatch 和 CachedRequestState,这样下一轮 Scheduler 不必把上一步 sampled token 再发回 Worker。
for req_idx in range(num_sampled_tokens):
...
start_idx = self.input_batch.num_tokens_no_spec[req_idx]
end_idx = start_idx + num_sampled_ids
...
self.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = sampled_ids
self.input_batch.is_token_ids[req_idx, start_idx:end_idx] = True
self.input_batch.num_tokens_no_spec[req_idx] = end_idx
req_id = req_ids[req_idx]
req_state = self.requests[req_id]
req_state.output_token_ids.extend(sampled_ids)
最后构造 ModelRunnerOutput:
output = ModelRunnerOutput(
req_ids=req_ids_output_copy,
req_id_to_index=req_id_to_index_output_copy,
sampled_token_ids=valid_sampled_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict=prompt_logprobs_dict,
kv_connector_output=kv_connector_output,
ec_connector_output=ec_connector_output
if self.supports_mm_inputs
else None,
num_nans_in_logits=num_nans_in_logits,
cudagraph_stats=cudagraph_stats,
)
非 async scheduling 直接返回;async scheduling 会返回 AsyncGPUModelRunnerOutput,把 sampled token 和 logprobs 的 D2H copy 放在单独 stream 上重叠执行。
if not self.use_async_scheduling:
return output
async_output = AsyncGPUModelRunnerOutput(
model_runner_output=output,
sampled_token_ids=sampler_output.sampled_token_ids,
logprobs_tensors=sampler_output.logprobs_tensors,
invalid_req_indices=invalid_req_indices,
async_output_copy_stream=self.async_output_copy_stream,
vocab_size=self.input_batch.vocab_size,
)
...
return async_output
八、一个小例子:两个请求如何被串起来
假设有两个生成请求:
R1 prompt = [10, 11, 12, 13], max_tokens = 2
R2 prompt = [20, 21], max_tokens = 2
max_num_scheduled_tokens = 6
无 prefix cache 命中,无 spec decode
8.1 第 1 轮:两个 waiting 请求进入 running
Scheduler 初始状态:
waiting = [R1, R2]
running = []
R1.num_computed_tokens = 0
R2.num_computed_tokens = 0
调度过程:
token_budget = 6
R1 需要算 4 个 prompt token -> 分配 4 -> budget 剩 2
R2 需要算 2 个 prompt token -> 分配 2 -> budget 剩 0
SchedulerOutput 概念上是:
scheduled_new_reqs = [R1, R2]
num_scheduled_tokens = {R1: 4, R2: 2}
total_num_scheduled_tokens = 6
GPUModelRunner 侧 _update_states() 会创建两个 CachedRequestState,写入 InputBatch。_prepare_inputs() 平铺出:
req_indices = [0, 0, 0, 0, 1, 1]
query_pos = [0, 1, 2, 3, 0, 1]
positions = [0, 1, 2, 3, 0, 1]
input_ids = [10, 11, 12, 13, 20, 21]
logits_indices = [3, 5]
模型 forward 得到 6 个 hidden states,但只取第 4 和第 6 个位置算 logits。假设采样:
R1 -> 100
R2 -> 200
sample_tokens() 返回:
ModelRunnerOutput.sampled_token_ids = [[100], [200]]
Scheduler 回灌后:
R1.all_token_ids = [10, 11, 12, 13, 100]
R2.all_token_ids = [20, 21, 200]
R1.num_computed_tokens = 4
R2.num_computed_tokens = 2
这里的 num_computed_tokens 表示“已被模型 forward 计算过的 token 数”。新采样出的 100/200 已经追加到请求 token 序列,但还没有作为下一轮输入被模型计算。
8.2 第 2 轮:decode
Scheduler 对 running 请求计算:
R1.num_tokens_with_spec = 5, num_computed_tokens = 4 -> 需要 1
R2.num_tokens_with_spec = 3, num_computed_tokens = 2 -> 需要 1
本轮调度:
num_scheduled_tokens = {R1: 1, R2: 1}
GPUModelRunner 的输入变成:
req_indices = [0, 1]
query_pos = [0, 0]
positions = [4, 2]
input_ids = [100, 200]
logits_indices = [0, 1]
forward 只跑两个 decode token。假设采样:
R1 -> 101
R2 -> 201
回灌后两个请求都达到 max_tokens=2,check_stop() 将它们标为完成,Scheduler 释放 KV block,并通过 EngineCoreOutputs 把最后 token 和 finish reason 发回前端。
这个例子里,Core 侧完整链路可以压缩成:
Frontend ADD
-> EngineCoreProc.input_queue
-> EngineCore.add_request()
-> Scheduler.waiting
-> Scheduler.schedule()
-> SchedulerOutput
-> Executor.execute_model()
-> Worker.execute_model()
-> GPUModelRunner._update_states()
-> GPUModelRunner._prepare_inputs()
-> model forward
-> sample_tokens()
-> ModelRunnerOutput
-> Scheduler.update_from_output()
-> EngineCoreOutputs
-> EngineCoreProc.output_queue
-> Frontend
九、总结
vLLM V1 Core 侧推理的核心不是“prefill 阶段”和“decode 阶段”的硬切换,而是一套更统一的追赶机制:每个请求都有 num_computed_tokens,Scheduler 每轮在 token budget、KV block、encoder budget、LoRA、prefix cache、spec decode 等约束下,让它尽量追上 num_tokens_with_spec。
SchedulerOutput 是这套机制的中间契约。Scheduler 用它描述“本轮算什么、用哪些 block、哪些请求新增/复用/结束”;GPUModelRunner 用它维护 Worker 侧的 CachedRequestState 和 InputBatch,生成 GPU 输入、slot mapping 与 attention metadata,执行 forward,再把采样结果封装成 ModelRunnerOutput。
最终,EngineCore.step() 把这两端接成闭环:调度、执行、采样、回灌。只要这个闭环持续运行,新增请求会从 waiting 进入 running,running 请求会逐 token 推进,完成请求会释放 KV cache 并把输出送回前端。
- 感谢你赐予我前进的力量
