本文走读 vLLM V1 Core 侧的一次推理迭代如何发生,覆盖这些关零本文走读 vLLM V1 Core 侧的一次推理迭代如何发生,覆盖这些关键类:

  • EngineCore / EngineCoreProc:Core 侧主循环与进程封装。
  • Scheduler:请求队列、KV block、token budget 的调度者。
  • Executor:Core 与 Worker 之间的执行抽象。
  • Worker:设备侧生命周期与 pipeline-parallel 桥接。
  • GPUModelRunner:把调度结果变成 GPU 输入、执行模型 forward、采样。
  • CachedRequestState:Worker/GPUModelRunner 内部缓存的请求状态。

SchedulerOutput 在本文中只作为 Scheduler 与 ModelRunner 之间的调度契约来使用,不展开它的字段构造、消费、更新全细节。

零、总览:一次 Core 侧推理迭代

Core 侧推理最短的描述是:Scheduler 决定本轮每个请求算多少 token、占哪些 KV block;Executor/Worker/GPUModelRunner 按这个计划跑模型;采样输出再回到 Scheduler 更新请求状态。

@startuml
actor Frontend
participant "EngineCoreProc" as Proc
participant "EngineCore" as Core
participant "Scheduler" as Sched
participant "Executor" as Exec
participant "Worker" as Worker
participant "GPUModelRunner" as Runner

Frontend -> Proc: ADD / ABORT / UTILITY
Proc -> Core: preprocess_add_request()
Core -> Sched: add_request(Request)

loop engine busy loop
  Core -> Sched: schedule()
  Sched --> Core: SchedulerOutput
  Core -> Exec: execute_model(SchedulerOutput)
  Exec -> Worker: execute_model(SchedulerOutput)
  Worker -> Runner: execute_model(SchedulerOutput, intermediate_tensors?)
  Runner -> Runner: _update_states()
  Runner -> Runner: _prepare_inputs()
  Runner -> Runner: model forward
  Runner --> Worker: None or ModelRunnerOutput

  alt execute_model returned None
    Core -> Sched: get_grammar_bitmask(SchedulerOutput)
    Core -> Exec: sample_tokens(grammar_output)
    Exec -> Worker: sample_tokens(grammar_output)
    Worker -> Runner: sample_tokens(grammar_output)
    Runner --> Core: ModelRunnerOutput
  end

  Core -> Sched: update_from_output(SchedulerOutput, ModelRunnerOutput)
  Sched --> Proc: EngineCoreOutputs
  Proc --> Frontend: output queue / ZMQ
end
@enduml

对应到代码,EngineCore.step() 就是这个闭环最凝练的版本:

def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
    ...
    scheduler_output = self.scheduler.schedule()
    future = self.model_executor.execute_model(scheduler_output, non_block=True)
    grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
    with (
        self.log_error_detail(scheduler_output),
        self.log_iteration_details(scheduler_output),
    ):
        model_output = future.result()
        if model_output is None:
            model_output = self.model_executor.sample_tokens(grammar_output)

    self._process_aborts_queue()
    engine_core_outputs = self.scheduler.update_from_output(
        scheduler_output, model_output
    )

    return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0

这段代码体现了 V1 Core 的三个边界:

  1. 调度边界Scheduler.schedule() 只产出本轮计划,不直接执行模型。
  2. 执行边界Executor.execute_model() 可能同步或异步地触发 Worker 侧执行。
  3. 回灌边界Scheduler.update_from_output() 消费模型输出,更新请求、KV、停止条件与前端输出。

一、EngineCoreProc:把 Core 放进后台进程

EngineCore 是内层逻辑,EngineCoreProc 则把它包装成后台进程,负责 ZMQ IO、输入输出队列、shutdown、数据并行协调等外围事务。

它的 busy loop 很克制:先从输入队列取请求,直到有活干;再执行一次 engine step;最后把 EngineCoreOutputs 放入输出队列。

def run_busy_loop(self):
    """Core busy loop of the EngineCore."""
    while self._handle_shutdown():
        # 1) Poll the input queue until there is work to do.
        self._process_input_queue()
        # 2) Step the engine core and return the outputs.
        self._process_engine_step()

    raise SystemExit
def _process_engine_step(self) -> bool:
    """Called only when there are unfinished local requests."""

    outputs, model_executed = self.step_fn()
    for output in outputs.items() if outputs else ():
        self.output_queue.put_nowait(output)
    self.post_step(model_executed)

    if not model_executed and self.scheduler.has_unfinished_requests():
        time.sleep(0.001)

    return model_executed

输入线程会把 ADD 请求先预处理成 Core 内部的 Request,再放进 input_queue。这个转换发生在 EngineCore.preprocess_add_request():多模态 feature 可能会走 receiver cache,随后由 Request.from_engine_core_request() 生成调度侧对象。

def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, int]:
    ...
    req = Request.from_engine_core_request(request, self.request_block_hasher)
    if req.use_structured_output:
        self.structured_output_manager.grammar_init(req)
    return req, request.current_wave

Core 初始化时会先创建 Executor,再根据模型和显存配置初始化 KV cache,最后创建 Scheduler:

def __init__(...):
    ...
    self.model_executor = executor_class(vllm_config)
    ...
    kv_cache_config = self._initialize_kv_caches(vllm_config)
    self.structured_output_manager = StructuredOutputManager(vllm_config)

    Scheduler = vllm_config.scheduler_config.get_scheduler_cls()
    ...
    self.scheduler: SchedulerInterface = Scheduler(
        vllm_config=vllm_config,
        kv_cache_config=kv_cache_config,
        structured_output_manager=self.structured_output_manager,
        include_finished_set=include_finished_set,
        log_stats=self.log_stats,
        block_size=scheduler_block_size,
    )
    ...
    self.step_fn = (
        self.step if self.batch_queue is None else self.step_with_batch_queue
    )

在普通路径下,step() 一轮一轮推进。开启 pipeline parallel 或 async scheduling 时,step_with_batch_queue() 会维护一个 batch queue,让“调度下一批”和“等待上一批输出”重叠起来,减少 pipeline bubble。

二、Executor:Core 与 Worker 的执行抽象

Executor 是 Core 侧看到的模型执行接口。它屏蔽了单进程、multiprocessing、Ray 等后端差异。对 EngineCore 来说,只需要调用 execute_model()sample_tokens()take_draft_token_ids() 等方法。

抽象类中的默认 execute_model() 通过 collective_rpc 调 Worker 方法,并取第一个返回值:

def execute_model(
    self, scheduler_output: SchedulerOutput, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
    output = self.collective_rpc(
        "execute_model", args=(scheduler_output,), non_block=non_block
    )
    return output[0]

单进程 executor 更直接:一个 WorkerWrapperBase 包住本进程内的 Worker。非阻塞模式下,如果 Worker 返回 AsyncModelRunnerOutput,executor 可以把 get_output() 放到后台线程里做。

def execute_model(
    self, scheduler_output: SchedulerOutput, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
    output = self.collective_rpc(
        "execute_model",
        args=(scheduler_output,),
        non_block=non_block,
        single_value=True,
    )
    if non_block and output.done():
        output.result()
    return output

Multiproc/Ray 后端也遵循同一个语义,只是执行地点不同:multiproc 会把 RPC 发给 worker 进程,Ray 会走 compiled DAG。Core 侧不需要关心这些差异。

三、Worker:设备侧生命周期与 PP 桥

GPU Worker 负责 CUDA device 初始化、分布式环境初始化、模型加载、KV cache 初始化、warmup/cudagraph capture,以及每轮调用 model runner。

Worker 初始化 device 时会根据环境变量选择 V1 或 V2 model runner。当前仓库默认 VLLM_USE_V2_MODEL_RUNNER=False,而本文重点覆盖的 CachedRequestState 属于 V1 vllm/v1/worker/gpu_model_runner.py 路径。

def init_device(self):
    ...
    self.use_v2_model_runner = envs.VLLM_USE_V2_MODEL_RUNNER
    ...
    if self.use_v2_model_runner:
        from vllm.v1.worker.gpu.model_runner import (
            GPUModelRunner as GPUModelRunnerV2,
        )

        # HACK(woosuk): This is a temporary fix to avoid type errors.
        self.model_runner: GPUModelRunner = GPUModelRunnerV2(  # type: ignore
            self.vllm_config, self.device
        )
    else:
        from vllm.v1.worker.gpu_model_runner import (
            GPUModelRunner as GPUModelRunnerV1,
        )
        self.model_runner = GPUModelRunnerV1(self.vllm_config, self.device)

执行模型时,Worker 还承担 pipeline parallel 的收发。非首个 PP rank 会先接收上一 stage 的 IntermediateTensors;非最后一个 PP rank 在 runner forward 后把中间张量发给下一 stage。

def execute_model(
    self, scheduler_output: "SchedulerOutput"
) -> ModelRunnerOutput | AsyncModelRunnerOutput | None:
    ...
    forward_pass = scheduler_output.total_num_scheduled_tokens > 0
    ...
    if forward_pass and not get_pp_group().is_first_rank:
        tensor_dict, comm_handles, comm_postprocess = (
            get_pp_group().irecv_tensor_dict(...)
        )
        intermediate_tensors = AsyncIntermediateTensors(...)

    with self.annotate_profile(scheduler_output):
        output = self.model_runner.execute_model(
            scheduler_output, intermediate_tensors
        )
        if isinstance(output, ModelRunnerOutput | AsyncModelRunnerOutput | NoneType):
            return output

    assert isinstance(output, IntermediateTensors)
    ...
    self._pp_send_work = get_pp_group().isend_tensor_dict(...)
    return None

因此,Worker 是“Core 调度契约”和“GPU runner 实际执行”之间的设备桥;真正把请求状态拼成张量的是 GPUModelRunner

四、Scheduler:没有固定 prefill/decode 阶段的 token 调度

Scheduler 的核心思想写在 schedule() 开头:它不是先做一批 prefill,再做一批 decode,而是用 num_computed_tokens 追赶 num_tokens_with_spec

def schedule(self) -> SchedulerOutput:
    # NOTE(woosuk) on the scheduling algorithm:
    # There's no "decoding phase" nor "prefill phase" in the scheduler.
    # Each request just has the num_computed_tokens and
    # num_tokens_with_spec. num_tokens_with_spec =
    # len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
    # At each step, the scheduler tries to assign tokens to the requests
    # so that each request's num_computed_tokens can catch up its
    # num_tokens_with_spec. This is general enough to cover
    # chunked prefills, prefix caching, speculative decoding,
    # and the "jump decoding" optimization in the future.
    ...

这个视角很关键。对 Scheduler 来说:

  • prompt 还没算完,是 num_computed_tokens < prompt_len
  • decode 需要新 token,是 sampled token 已进入 all_token_ids,但还没被模型计算。
  • spec decode 是 spec_token_ids 临时扩展了 num_tokens_with_spec
  • chunked prefill 只是本轮 token budget 不够,一次只追赶一段。

4.1 Scheduler 内部状态

Scheduler 维护三类请求容器:

requests: req_id -> Request       # 全局请求表
waiting / skipped_waiting         # 等待进入运行态
running                           # 已占用调度资源、可持续推进
finished_req_ids                  # 本轮需要通知 worker 清理的请求

请求生命周期可以简化为:

WAITING
  |
  | schedule waiting request + allocate KV slots
  v
RUNNING
  |
  | output reaches stop / length cap / abort
  v
FINISHED_*

RUNNING --KV 不足/优先级抢占--> PREEMPTED --> WAITING
WAITING --远端 KV 未就绪-------> WAITING_FOR_REMOTE_KVS
WAITING --结构化 grammar 未就绪-> WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR

新增请求只入队,不立刻执行:

def add_request(self, request: Request) -> None:
    existing = self.requests.get(request.request_id)
    if existing is not None:
        ...
    else:
        if request.resumable:
            request.streaming_queue = deque()
        self._enqueue_waiting_request(request)
        self.requests[request.request_id] = request
        if self.log_stats:
            request.record_event(EngineCoreEventType.QUEUED)

4.2 调度第一段:优先推进 RUNNING 请求

schedule() 先处理 running。这保证已经在持续生成的请求优先得到推进,也使 decode 场景下每个请求通常每轮只补一个 token。

核心计算是:

num_new_tokens = (
    request.num_tokens_with_spec
    + request.num_output_placeholders
    - request.num_computed_tokens
)
if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
    num_new_tokens = self.scheduler_config.long_prefill_token_threshold
num_new_tokens = min(num_new_tokens, token_budget)

num_new_tokens = min(
    num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
)

含义是:本轮最多调度“还没被模型计算的 token 数”,再受限于 long prefill threshold、全局 token budget 和 max model len。

随后 Scheduler 向 KV cache manager 申请 slot:

while True:
    new_blocks = self.kv_cache_manager.allocate_slots(
        request,
        num_new_tokens,
        num_lookahead_tokens=self.num_lookahead_tokens,
    )

    if new_blocks is not None:
        break

    if self.policy == SchedulingPolicy.PRIORITY:
        preempted_req = max(
            self.running,
            key=lambda r: (r.priority, r.arrival_time),
        )
        self.running.remove(preempted_req)
        ...
    else:
        preempted_req = self.running.pop()

    self._preempt_request(preempted_req, scheduled_timestamp)
    preempted_reqs.append(preempted_req)
    if preempted_req == request:
        break

KV 空间不足时会抢占一个 running request。抢占会释放 KV block,把请求状态改为 PREEMPTED,并把它放回 waiting 队列:

def _preempt_request(self, request: Request, timestamp: float) -> None:
    ...
    self.kv_cache_manager.free(request)
    self.encoder_cache_manager.free(request)
    request.status = RequestStatus.PREEMPTED
    request.num_computed_tokens = 0
    if request.spec_token_ids:
        request.spec_token_ids = []
    request.num_preemptions += 1
    ...
    self.waiting.prepend_request(request)

调度成功后,Scheduler 记录本轮每个 request 的 token 数和新增 block:

scheduled_running_reqs.append(request)
request_id = request.request_id
req_to_new_blocks[request_id] = new_blocks
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens

4.3 调度第二段:从 WAITING 提升新请求或恢复请求

如果本轮没有发生 preemption 且 scheduler 未暂停,才会继续调度 waiting 请求。这里会处理 prefix cache、远端 KV、encoder input、LoRA 约束、chunked prefill、KV 分配等。

prefix cache 命中发生在请求第一次被调度时:

if request.num_computed_tokens == 0:
    new_computed_blocks, num_new_local_computed_tokens = (
        self.kv_cache_manager.get_computed_blocks(request)
    )

    if self.connector is not None:
        ext_tokens, load_kv_async = (
            self.connector.get_num_new_matched_tokens(
                request, num_new_local_computed_tokens
            )
        )
        ...

    num_computed_tokens = (
        num_new_local_computed_tokens + num_external_computed_tokens
    )
    assert num_computed_tokens <= request.num_tokens
else:
    new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
    num_new_local_computed_tokens = 0
    num_computed_tokens = request.num_computed_tokens

然后计算还需要调度多少 token:

if load_kv_async:
    assert num_external_computed_tokens > 0
    num_new_tokens = 0
else:
    num_new_tokens = request.num_tokens - num_computed_tokens
    threshold = self.scheduler_config.long_prefill_token_threshold
    if 0 < threshold < num_new_tokens:
        num_new_tokens = threshold

    if (
        not self.scheduler_config.enable_chunked_prefill
        and num_new_tokens > token_budget
    ):
        break

    num_new_tokens = min(num_new_tokens, token_budget)
    assert num_new_tokens > 0

这里能看到 chunked prefill 的开关含义:如果不允许 chunk,而当前 prompt 剩余 token 超过 budget,就不调度这个请求;如果允许,就按 budget 切一段。

最后申请 KV slot 并把 waiting 请求变成 running:

new_blocks = self.kv_cache_manager.allocate_slots(
    request,
    num_new_tokens,
    num_new_computed_tokens=num_new_local_computed_tokens,
    new_computed_blocks=new_computed_blocks,
    num_lookahead_tokens=effective_lookahead_tokens,
    num_external_computed_tokens=num_external_computed_tokens,
    delay_cache_blocks=load_kv_async,
    num_encoder_tokens=num_encoder_tokens,
)

if new_blocks is None:
    if request.has_encoder_inputs:
        self.encoder_cache_manager.free(request)
    break
...
self.running.append(request)
...
req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(request_id)
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_computed_tokens

4.4 SchedulerOutput:本轮计划的压缩表达

Scheduler 结尾会把本轮计划压成 SchedulerOutput。本文不拆它,但要把它的角色说清楚:

SchedulerOutput =
  新请求完整资料             scheduled_new_reqs
  老请求增量资料             scheduled_cached_reqs
  每个请求本轮 token 数       num_scheduled_tokens
  本轮总 token 数             total_num_scheduled_tokens
  spec decode 草稿 token      scheduled_spec_decode_tokens
  encoder 输入调度            scheduled_encoder_inputs
  finished/preempted 通知     finished_req_ids / preempted_req_ids
  KV/EC connector metadata    kv_connector_metadata / ec_connector_metadata

构造位置如下:

scheduler_output = SchedulerOutput(
    scheduled_new_reqs=new_reqs_data,
    scheduled_cached_reqs=cached_reqs_data,
    num_scheduled_tokens=num_scheduled_tokens,
    total_num_scheduled_tokens=total_num_scheduled_tokens,
    scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
    scheduled_encoder_inputs=scheduled_encoder_inputs,
    num_common_prefix_blocks=num_common_prefix_blocks,
    preempted_req_ids={req.request_id for req in preempted_reqs},
    finished_req_ids=self.finished_req_ids,
    free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
    new_block_ids_to_zero=new_block_ids_to_zero,
)

构造完成后,Scheduler 立即推进自己的 num_computed_tokens。这一步容易忽略:调度输出里需要保留“调度前”的 token 位置供 runner 取 input ids,但 Scheduler 内部又要允许下一轮继续调度同一个 prefill chunk。

def _update_after_schedule(self, scheduler_output: SchedulerOutput) -> None:
    num_scheduled_tokens = scheduler_output.num_scheduled_tokens
    for req_id, num_scheduled_token in num_scheduled_tokens.items():
        request = self.requests[req_id]
        request.num_computed_tokens += num_scheduled_token
        request.is_prefill_chunk = request.num_computed_tokens < (
            request.num_tokens + request.num_output_placeholders
        )
        scheduler_output.has_structured_output_requests |= (
            request.use_structured_output and not request.is_prefill_chunk
        )
        ...

    self.finished_req_ids = set()

可以把这个设计理解成:

SchedulerOutput 记录“本轮从哪里开始算、算多少”
Scheduler 内部状态则提前移动到“本轮算完后的位置”
如果后续 spec token 被拒绝,再在 update_from_output() 回退

4.5 回灌:update_from_output()

模型输出回来后,Scheduler 对每个本轮调度的请求做三件事:

  1. 取出采样 token、logprobs、pooling output 等。
  2. 把 token 追加到 Request,检查 stop 条件。
  3. 对完成请求释放 KV / encoder cache,并生成给前端的 EngineCoreOutput

核心片段如下:

for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
    ...
    request = self.requests.get(req_id)
    if request is None or request.is_finished():
        continue

    req_index = model_runner_output.req_id_to_index[req_id]
    generated_token_ids = (
        sampled_token_ids[req_index] if sampled_token_ids else []
    )
    ...
    if new_token_ids:
        new_token_ids, stopped = self._update_request_with_output(
            request, new_token_ids
        )
    elif request.pooling_params and pooler_output is not None:
        request.status = RequestStatus.FINISHED_STOPPED
        stopped = True

    if stopped:
        finish_reason = request.get_finished_reason()
        finished = self._handle_stopped_request(request)
        if finished:
            kv_transfer_params = self._free_request(request)

追加 token 与停止判断集中在 _update_request_with_output()

def _update_request_with_output(
    self, request: Request, new_token_ids: list[int]
) -> tuple[list[int], bool]:
    stopped = False
    for num_new, output_token_id in enumerate(new_token_ids, 1):
        request.append_output_token_ids(output_token_id)

        stopped = check_stop(request, self.max_model_len)
        if stopped:
            del new_token_ids[num_new:]
            break
    return new_token_ids, stopped

五、CachedRequestState:Worker 侧的请求镜像

Scheduler 的 Request 属于 Core 侧调度状态;GPUModelRunner 不能每轮都从 Core 全量拿请求资料,所以 Worker 侧有一份缓存状态 CachedRequestState

它包含 prompt、mm feature、sampling params、block table、已计算 token 数、已输出 token 等:

@dataclass
class CachedRequestState:
    req_id: str
    prompt_token_ids: list[int] | None
    mm_features: list[MultiModalFeatureSpec]
    sampling_params: SamplingParams | None
    generator: torch.Generator | None

    block_ids: tuple[list[int], ...]
    num_computed_tokens: int
    output_token_ids: list[int]

    mrope_positions: torch.Tensor | None = None
    mrope_position_delta: int | None = None
    ...

    def __post_init__(self):
        self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
            self.prompt_token_ids, self.prompt_embeds
        )

它和 InputBatch 是配套的:

GPUModelRunner.requests[req_id] -> CachedRequestState   # 请求级缓存
GPUModelRunner.input_batch                              # 批级常驻张量/表

InputBatch.add_request() 会把请求写入常驻 batch,包括 token ids、block table、采样参数、LoRA 映射等:

def add_request(self, request: "CachedRequestState") -> int:
    req_index = self._register_add_request(request)
    ...
    self.req_id_to_index[req_id] = req_index

    num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
        request.prompt_token_ids, request.prompt_embeds
    )
    self.num_prompt_tokens[req_index] = num_prompt_tokens
    ...
    self.token_ids_cpu[req_index, start_idx:end_idx] = request.output_token_ids
    self.num_tokens_no_spec[req_index] = request.num_tokens

    self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
    self.block_table.add_row(request.block_ids, req_index)
    ...

如果请求完成或本轮没有被调度,runner 会从 persistent batch 中移除它;如果只是暂时未调度,CachedRequestState 仍保留,下一轮再加回 InputBatch

这个设计减少了 EngineCore 到 Worker 的通信量:新请求发全量,老请求发增量。

六、GPUModelRunner.execute_model:从 SchedulerOutput 到 forward

GPUModelRunner.execute_model() 是设备侧主流程。可以把它分成七步:

1. 处理 connector / ngram 等前置状态
2. _update_states(): 合并 SchedulerOutput 到 CachedRequestState + InputBatch
3. _prepare_inputs(): 生成 input_ids、positions、slot mapping、logits_indices
4. 构造 attention metadata / cudagraph batch descriptor
5. _preprocess(): 多模态 embedding、encoder-decoder、PP intermediate tensors
6. _model_forward(): 调模型 forward
7. last PP rank 计算 logits,把状态留给 sample_tokens()

主函数中从调度结果到 forward 的关键部分如下:

def execute_model(
    self,
    scheduler_output: "SchedulerOutput",
    intermediate_tensors: IntermediateTensors | None = None,
) -> ModelRunnerOutput | AsyncModelRunnerOutput | IntermediateTensors | None:
    ...
    num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
    with (
        record_function_or_nullcontext("gpu_model_runner: preprocess"),
        self.synchronize_input_prep(),
    ):
        deferred_state_corrections_fn = self._update_states(scheduler_output)
        ...
        if not num_scheduled_tokens:
            ...
            return EMPTY_MODEL_RUNNER_OUTPUT

        num_reqs = self.input_batch.num_reqs
        req_ids = self.input_batch.req_ids
        tokens = [scheduler_output.num_scheduled_tokens[i] for i in req_ids]
        num_scheduled_tokens_np = np.array(tokens, dtype=np.int32)
        ...
        logits_indices, spec_decode_metadata = self._prepare_inputs(
            scheduler_output,
            num_scheduled_tokens_np,
        )
        ...
        attn_metadata, spec_decode_common_attn_metadata = (
            self._build_attention_metadata(...)
        )
        ...
        (
            input_ids,
            inputs_embeds,
            positions,
            intermediate_tensors,
            model_kwargs,
            ec_connector_output,
        ) = self._preprocess(
            scheduler_output, num_tokens_padded, intermediate_tensors
        )

6.1 第一步:_update_states() 合并调度状态

_update_states() 先清理完成请求,再处理本轮不再调度的 batch 行,再加入新请求和恢复请求,最后刷新采样 metadata。

清理完成请求:

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    for req_id in scheduler_output.finished_req_ids:
        self.requests.pop(req_id, None)
        self.num_prompt_logprobs.pop(req_id, None)
    ...
    for req_id in scheduler_output.finished_req_ids:
        self.input_batch.remove_request(req_id)

移除本轮未调度请求的 persistent batch 行:

scheduled_req_ids = scheduler_output.num_scheduled_tokens.keys()
cached_req_ids = self.input_batch.req_id_to_index.keys()
resumed_req_ids = scheduler_output.scheduled_cached_reqs.resumed_req_ids
unscheduled_req_ids = cached_req_ids - (scheduled_req_ids - resumed_req_ids)
...
for req_id in unscheduled_req_ids:
    self.input_batch.remove_request(req_id)

添加新请求时,runner 创建 CachedRequestState,并初始化随机 generator、pooling 参数、prompt logprobs、M-RoPE/XD-RoPE 等状态:

for new_req_data in scheduler_output.scheduled_new_reqs:
    req_id = new_req_data.req_id
    ...
    req_state = CachedRequestState(
        req_id=req_id,
        prompt_token_ids=new_req_data.prompt_token_ids,
        prompt_embeds=new_req_data.prompt_embeds,
        mm_features=new_req_data.mm_features,
        sampling_params=sampling_params,
        pooling_params=pooling_params,
        generator=generator,
        block_ids=new_req_data.block_ids,
        num_computed_tokens=new_req_data.num_computed_tokens,
        output_token_ids=[],
        lora_request=new_req_data.lora_request,
    )
    self.requests[req_id] = req_state
    ...
    reqs_to_add.append(req_state)

更新老请求时,会同步 num_computed_tokens、追加新 block、处理 PP 下的新 token、处理 spec token:

for i, req_id in enumerate(req_data.req_ids):
    req_state = self.requests[req_id]
    num_computed_tokens = req_data.num_computed_tokens[i]
    new_block_ids = req_data.new_block_ids[i]
    resumed_from_preemption = req_id in req_data.resumed_req_ids
    ...
    req_state.num_computed_tokens = num_computed_tokens
    ...
    if not resumed_from_preemption:
        if new_block_ids is not None:
            for block_ids, new_ids in zip(req_state.block_ids, new_block_ids):
                block_ids.extend(new_ids)
    else:
        assert req_index is None
        assert new_block_ids is not None
        req_state.block_ids = new_block_ids
    ...
    self.input_batch.num_computed_tokens_cpu[req_index] = num_computed_tokens
    if new_block_ids is not None:
        self.input_batch.block_table.append_row(new_block_ids, req_index)
    ...
    self.input_batch.update_req_spec_token_ids(req_state, scheduled_spec_tokens)

最后把待加入请求写入 InputBatch,压缩空洞,并刷新 sampling metadata:

for request in reqs_to_add:
    self.input_batch.add_request(request)
    self.input_batch.update_req_spec_token_ids(request, scheduled_spec_tokens)

self.input_batch.condense()
self._may_reorder_batch(scheduler_output)
self.input_batch.refresh_metadata()

6.2 第二步:_prepare_inputs() 拼本轮 GPU 输入

_prepare_inputs() 的任务是把“每个 request 本轮算多少 token”转成平铺的 GPU batch。

假设本轮三个请求分别调度 [2, 5, 3] 个 token:

req_indices:
  [0, 0, 1, 1, 1, 1, 1, 2, 2, 2]

query_pos:
  [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]

positions:
  input_batch.num_computed_tokens_cpu[req_indices] + query_pos

源码中的这段正是上述变换:

def _prepare_inputs(
    self,
    scheduler_output: "SchedulerOutput",
    num_scheduled_tokens: np.ndarray,
) -> tuple[torch.Tensor, SpecDecodeMetadata | None]:
    ...
    self.input_batch.block_table.commit_block_table(num_reqs)

    req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)

    cu_num_tokens = self._get_cumsum_and_arange(
        num_scheduled_tokens, self.query_pos.np
    )

    positions_np = (
        self.input_batch.num_computed_tokens_cpu[req_indices]
        + self.query_pos.np[: cu_num_tokens[-1]]
    )

然后它从 InputBatch.token_ids_cpu 中按位置取出本轮 input ids:

token_indices = (
    positions_np + req_indices * self.input_batch.token_ids_cpu.shape[1]
)
token_indices_tensor = torch.from_numpy(token_indices)

torch.index_select(
    self.input_batch.token_ids_cpu_tensor.flatten(),
    0,
    token_indices_tensor,
    out=self.input_ids.cpu[:total_num_scheduled_tokens],
)

随后准备 attention metadata 需要的 query start、seq len、slot mapping:

self.query_start_loc.np[0] = 0
self.query_start_loc.np[1 : num_reqs + 1] = cu_num_tokens
self.query_start_loc.np[num_reqs + 1 :].fill(cu_num_tokens[-1])
self.query_start_loc.copy_to_gpu()
query_start_loc = self.query_start_loc.gpu[: num_reqs + 1]
...
self.input_batch.block_table.compute_slot_mapping(
    num_reqs,
    self.query_start_loc.gpu[: num_reqs + 1],
    self.positions[:total_num_scheduled_tokens],
)
...
self._prepare_input_ids(
    scheduler_output,
    num_reqs,
    total_num_scheduled_tokens,
    cu_num_tokens,
)

slot_mapping 是“本轮每个 token 写入 KV cache 的哪个 slot”的设备侧映射。Scheduler 分配 block,runner 根据 block table 和 token position 计算 slot。

最后确定哪些 hidden states 需要拿来算 logits:

if not use_spec_decode:
    logits_indices = query_start_loc[1:] - 1
    spec_decode_metadata = None
    num_sampled_tokens = np.ones(num_reqs, dtype=np.int32)
else:
    ...
    spec_decode_metadata = self._calc_spec_decode_metadata(
        num_draft_tokens, cu_num_tokens
    )
    logits_indices = spec_decode_metadata.logits_indices
    num_sampled_tokens = num_draft_tokens + 1

非 spec decode 下,每个 request 只采样本轮最后一个 token 位置,所以 logits_indices = query_start_loc[1:] - 1

6.3 第三步:_preprocess() 处理模型输入形态

_preprocess()input_idsinputs_embedspositionsintermediate_tensorsmodel_kwargs 准备好。纯文本模型走 token ids;多模态模型会先执行 encoder 并把 embedding 填进 inputs_embeds;encoder-decoder 模型会额外传 encoder_outputs

纯文本路径最简单:

else:
    # For text-only models, we use token ids as input.
    input_ids = self.input_ids.gpu[:num_input_tokens]
    inputs_embeds = None
    model_kwargs = self._init_model_kwargs()

多模态路径会先收集多模态 embedding:

if self.supports_mm_inputs and is_first_rank and not is_encoder_decoder:
    with self.maybe_get_ec_connector_output(
        scheduler_output,
        encoder_cache=self.encoder_cache,
    ) as ec_connector_output:
        self._execute_mm_encoder(scheduler_output)
        mm_embeds, is_mm_embed = self._gather_mm_embeddings(scheduler_output)

    inputs_embeds_scheduled = self.model.embed_input_ids(
        self.input_ids.gpu[:num_scheduled_tokens],
        multimodal_embeddings=mm_embeds,
        is_multimodal=is_mm_embed,
    )
    self.inputs_embeds.gpu[:num_scheduled_tokens].copy_(inputs_embeds_scheduled)
    input_ids, inputs_embeds = self._prepare_mm_inputs(num_input_tokens)

6.4 第四步:forward 与 logits

runner 进入 set_forward_context(),把 attention metadata、cudagraph mode、batch descriptor、slot mapping 等设置到 forward context,再调用模型:

with (
    set_forward_context(
        attn_metadata,
        self.vllm_config,
        num_tokens=num_tokens_padded,
        num_tokens_across_dp=num_tokens_across_dp,
        cudagraph_runtime_mode=cudagraph_mode,
        batch_descriptor=batch_desc,
        ubatch_slices=ubatch_slices_padded,
        slot_mapping=slot_mappings,
        skip_compiled=has_encoder_input,
    ),
    record_function_or_nullcontext("gpu_model_runner: forward"),
    self.maybe_get_kv_connector_output(
        scheduler_output,
        defer_finalize=defer_kv_connector_finalize,
    ) as kv_connector_output,
):
    model_output = self._model_forward(
        input_ids=input_ids,
        positions=positions,
        intermediate_tensors=intermediate_tensors,
        inputs_embeds=inputs_embeds,
        **model_kwargs,
    )

_model_forward() 本身只是模型调用薄封装:

def _model_forward(
    self,
    input_ids: torch.Tensor | None = None,
    positions: torch.Tensor | None = None,
    intermediate_tensors: IntermediateTensors | None = None,
    inputs_embeds: torch.Tensor | None = None,
    **model_kwargs: dict[str, Any],
) -> Any:
    return self.model(
        input_ids=input_ids,
        positions=positions,
        intermediate_tensors=intermediate_tensors,
        inputs_embeds=inputs_embeds,
        **model_kwargs,
    )

llama.py 为例,模型 forward 在首个 PP rank 做 embedding,中间层逐层 forward,非最后 rank 返回 IntermediateTensors,最后 rank 做 norm 并返回 hidden states:

def forward(
    self,
    input_ids: torch.Tensor | None,
    positions: torch.Tensor,
    intermediate_tensors: IntermediateTensors | None,
    inputs_embeds: torch.Tensor | None = None,
    **extra_layer_kwargs,
) -> torch.Tensor | IntermediateTensors | tuple[torch.Tensor, list[torch.Tensor]]:
    if get_pp_group().is_first_rank:
        if inputs_embeds is not None:
            hidden_states = inputs_embeds
        else:
            hidden_states = self.embed_input_ids(input_ids)
        residual = None
    else:
        assert intermediate_tensors is not None
        hidden_states = intermediate_tensors["hidden_states"]
        residual = intermediate_tensors["residual"]

    for idx, layer in enumerate(
        islice(self.layers, self.start_layer, self.end_layer)
    ):
        hidden_states, residual = layer(
            positions, hidden_states, residual, **extra_layer_kwargs
        )
        ...

    if not get_pp_group().is_last_rank:
        return IntermediateTensors(
            {"hidden_states": hidden_states, "residual": residual}
        )

    hidden_states, _ = self.norm(hidden_states, residual)
    ...
    return hidden_states

最后一个 PP rank 会按 logits_indices 取需要采样的位置,计算 logits:

if not get_pp_group().is_last_rank:
    assert isinstance(hidden_states, IntermediateTensors)
    hidden_states.kv_connector_output = kv_connector_output
    self.kv_connector_output = kv_connector_output
    return hidden_states

if self.is_pooling_model:
    return self._pool(...)

sample_hidden_states = hidden_states[logits_indices]
logits = self.model.compute_logits(sample_hidden_states)

Llama 的 logits 计算也很薄:

def compute_logits(
    self,
    hidden_states: torch.Tensor,
) -> torch.Tensor | None:
    logits = self.logits_processor(self.lm_head, hidden_states)
    return logits

6.5 第五步:execute_model 为什么常常返回 None

普通生成模型在 forward 后并不马上返回 ModelRunnerOutput,而是把 logits、hidden states、spec metadata 等暂存到 execute_model_state,然后返回 None

self.execute_model_state = ExecuteModelState(
    scheduler_output,
    logits,
    spec_decode_metadata,
    spec_decode_common_attn_metadata,
    hidden_states,
    sample_hidden_states,
    aux_hidden_states,
    ec_connector_output,
    cudagraph_stats,
    slot_mappings,
)
self.kv_connector_output = kv_connector_output
...
return None

这样做是为了让 Core 有机会在 forward 之后、sample 之前,从 Scheduler 获取 structured output 的 grammar bitmask:

execute_model() -> forward 得到 logits -> 返回 None
EngineCore -> Scheduler.get_grammar_bitmask()
sample_tokens(grammar_output) -> 对 logits 套 grammar mask -> sample

这也是 WorkerBase.execute_model() 文档里说“如果返回 None,必须立即调用 sample_tokens”的原因。

七、GPUModelRunner.sample_tokens:采样、回写 batch、生成输出

sample_tokens() 消费 execute_model_state。先应用 grammar bitmask,再调用 sampler:

if grammar_output is not None:
    apply_grammar_bitmask(
        scheduler_output, grammar_output, self.input_batch, logits
    )

with record_function_or_nullcontext("gpu_model_runner: sample"):
    sampler_output = self._sample(logits, spec_decode_metadata)

_sample() 会用 InputBatch.sampling_metadata 调 sampler;spec decode 时走 rejection sampler:

def _sample(
    self,
    logits: torch.Tensor | None,
    spec_decode_metadata: SpecDecodeMetadata | None,
) -> SamplerOutput:
    sampling_metadata = self.input_batch.sampling_metadata
    self.input_batch.update_async_output_token_ids()
    if spec_decode_metadata is None:
        return self.sampler(
            logits=logits,
            sampling_metadata=sampling_metadata,
        )

    ...
    sampler_output = self.rejection_sampler(
        spec_decode_metadata,
        None,
        logits,
        sampling_metadata,
    )
    return sampler_output

采样后 _bookkeeping_sync() 会做设备到 CPU 的结果整理,并把 sampled token 缓存回 InputBatchCachedRequestState,这样下一轮 Scheduler 不必把上一步 sampled token 再发回 Worker。

for req_idx in range(num_sampled_tokens):
    ...
    start_idx = self.input_batch.num_tokens_no_spec[req_idx]
    end_idx = start_idx + num_sampled_ids
    ...
    self.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = sampled_ids
    self.input_batch.is_token_ids[req_idx, start_idx:end_idx] = True
    self.input_batch.num_tokens_no_spec[req_idx] = end_idx

    req_id = req_ids[req_idx]
    req_state = self.requests[req_id]
    req_state.output_token_ids.extend(sampled_ids)

最后构造 ModelRunnerOutput

output = ModelRunnerOutput(
    req_ids=req_ids_output_copy,
    req_id_to_index=req_id_to_index_output_copy,
    sampled_token_ids=valid_sampled_token_ids,
    logprobs=logprobs_lists,
    prompt_logprobs_dict=prompt_logprobs_dict,
    kv_connector_output=kv_connector_output,
    ec_connector_output=ec_connector_output
    if self.supports_mm_inputs
    else None,
    num_nans_in_logits=num_nans_in_logits,
    cudagraph_stats=cudagraph_stats,
)

非 async scheduling 直接返回;async scheduling 会返回 AsyncGPUModelRunnerOutput,把 sampled token 和 logprobs 的 D2H copy 放在单独 stream 上重叠执行。

if not self.use_async_scheduling:
    return output

async_output = AsyncGPUModelRunnerOutput(
    model_runner_output=output,
    sampled_token_ids=sampler_output.sampled_token_ids,
    logprobs_tensors=sampler_output.logprobs_tensors,
    invalid_req_indices=invalid_req_indices,
    async_output_copy_stream=self.async_output_copy_stream,
    vocab_size=self.input_batch.vocab_size,
)
...
return async_output

八、一个小例子:两个请求如何被串起来

假设有两个生成请求:

R1 prompt = [10, 11, 12, 13], max_tokens = 2
R2 prompt = [20, 21],         max_tokens = 2
max_num_scheduled_tokens = 6
无 prefix cache 命中,无 spec decode

8.1 第 1 轮:两个 waiting 请求进入 running

Scheduler 初始状态:

waiting = [R1, R2]
running = []
R1.num_computed_tokens = 0
R2.num_computed_tokens = 0

调度过程:

token_budget = 6
R1 需要算 4 个 prompt token -> 分配 4 -> budget 剩 2
R2 需要算 2 个 prompt token -> 分配 2 -> budget 剩 0

SchedulerOutput 概念上是:

scheduled_new_reqs = [R1, R2]
num_scheduled_tokens = {R1: 4, R2: 2}
total_num_scheduled_tokens = 6

GPUModelRunner 侧 _update_states() 会创建两个 CachedRequestState,写入 InputBatch_prepare_inputs() 平铺出:

req_indices = [0, 0, 0, 0, 1, 1]
query_pos   = [0, 1, 2, 3, 0, 1]
positions  = [0, 1, 2, 3, 0, 1]
input_ids  = [10, 11, 12, 13, 20, 21]

logits_indices = [3, 5]

模型 forward 得到 6 个 hidden states,但只取第 4 和第 6 个位置算 logits。假设采样:

R1 -> 100
R2 -> 200

sample_tokens() 返回:

ModelRunnerOutput.sampled_token_ids = [[100], [200]]

Scheduler 回灌后:

R1.all_token_ids = [10, 11, 12, 13, 100]
R2.all_token_ids = [20, 21, 200]
R1.num_computed_tokens = 4
R2.num_computed_tokens = 2

这里的 num_computed_tokens 表示“已被模型 forward 计算过的 token 数”。新采样出的 100/200 已经追加到请求 token 序列,但还没有作为下一轮输入被模型计算。

8.2 第 2 轮:decode

Scheduler 对 running 请求计算:

R1.num_tokens_with_spec = 5, num_computed_tokens = 4 -> 需要 1
R2.num_tokens_with_spec = 3, num_computed_tokens = 2 -> 需要 1

本轮调度:

num_scheduled_tokens = {R1: 1, R2: 1}

GPUModelRunner 的输入变成:

req_indices = [0, 1]
query_pos   = [0, 0]
positions  = [4, 2]
input_ids  = [100, 200]
logits_indices = [0, 1]

forward 只跑两个 decode token。假设采样:

R1 -> 101
R2 -> 201

回灌后两个请求都达到 max_tokens=2check_stop() 将它们标为完成,Scheduler 释放 KV block,并通过 EngineCoreOutputs 把最后 token 和 finish reason 发回前端。

这个例子里,Core 侧完整链路可以压缩成:

Frontend ADD
  -> EngineCoreProc.input_queue
  -> EngineCore.add_request()
  -> Scheduler.waiting
  -> Scheduler.schedule()
  -> SchedulerOutput
  -> Executor.execute_model()
  -> Worker.execute_model()
  -> GPUModelRunner._update_states()
  -> GPUModelRunner._prepare_inputs()
  -> model forward
  -> sample_tokens()
  -> ModelRunnerOutput
  -> Scheduler.update_from_output()
  -> EngineCoreOutputs
  -> EngineCoreProc.output_queue
  -> Frontend

九、总结

vLLM V1 Core 侧推理的核心不是“prefill 阶段”和“decode 阶段”的硬切换,而是一套更统一的追赶机制:每个请求都有 num_computed_tokens,Scheduler 每轮在 token budget、KV block、encoder budget、LoRA、prefix cache、spec decode 等约束下,让它尽量追上 num_tokens_with_spec

SchedulerOutput 是这套机制的中间契约。Scheduler 用它描述“本轮算什么、用哪些 block、哪些请求新增/复用/结束”;GPUModelRunner 用它维护 Worker 侧的 CachedRequestStateInputBatch,生成 GPU 输入、slot mapping 与 attention metadata,执行 forward,再把采样结果封装成 ModelRunnerOutput

最终,EngineCore.step() 把这两端接成闭环:调度、执行、采样、回灌。只要这个闭环持续运行,新增请求会从 waiting 进入 running,running 请求会逐 token 推进,完成请求会释放 KV cache 并把输出送回前端。