(本篇是03。01 Client篇和 02 Core 篇后面再补上来)

本文聚焦 vLLM V1 中最基础的 text-only 生成路径。多模态 encoder、speculative decoding、pipeline/data parallel、KV/EC connector、structured output、pooling 等扩展点只在字段存在性上点到为止,不展开机制。

1. 一句话定位

SchedulerOutput 是一次 engine iteration 的调度结果。它把 scheduler 对“本轮 forward 要处理哪些请求、每个请求从哪个 token 位置开始处理、处理多少 token、使用哪些 KV cache block、哪些 worker 侧缓存需要清理”的决策,打包传给 model runner。

它不是长期状态本身。长期状态分别放在:

  • scheduler 侧的 Requestself.runningself.waitingKVCacheManager
  • worker/model runner 侧的 CachedRequestStateInputBatch、block table、token cache

SchedulerOutput 更像二者之间的一帧执行计划。

Scheduler state                         Worker state
  Request / queues / KV blocks            CachedRequestState / InputBatch
             \                              /
              \-- one iteration plan ------/
                   SchedulerOutput

2. 主循环中的生命周期

基础同步路径在 EngineCore.step() 里非常直接:先调度,再执行模型,再把模型输出回写 scheduler。

def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
    ...
    if not self.scheduler.has_requests():
        return {}, False
    scheduler_output = self.scheduler.schedule()
    future = self.model_executor.execute_model(scheduler_output, non_block=True)
    grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
    with (
        self.log_error_detail(scheduler_output),
        self.log_iteration_details(scheduler_output),
    ):
        model_output = future.result()
        if model_output is None:
            model_output = self.model_executor.sample_tokens(grammar_output)

    # Before processing the model output, process any aborts that happened
    # during the model execution.
    self._process_aborts_queue()
    engine_core_outputs = self.scheduler.update_from_output(
        scheduler_output, model_output
    )

    return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0
    ...

对应时序可以压缩成:

@startuml
participant EngineCore
participant Scheduler
participant Executor
participant GPUModelRunner

EngineCore -> Scheduler: schedule()
Scheduler --> EngineCore: SchedulerOutput
EngineCore -> Executor: execute_model(SchedulerOutput)
Executor -> GPUModelRunner: execute_model(...)
GPUModelRunner -> GPUModelRunner: update worker cache / prepare input / forward
GPUModelRunner --> Executor: None or ModelRunnerOutput
EngineCore -> Executor: sample_tokens(...) if needed
Executor -> GPUModelRunner: sample_tokens(...)
GPUModelRunner --> EngineCore: ModelRunnerOutput
EngineCore -> Scheduler: update_from_output(SchedulerOutput, ModelRunnerOutput)
Scheduler --> EngineCore: EngineCoreOutputs
@enduml

在无 pipeline batch queue 的普通路径里,SchedulerOutput 的生命期就是这一轮 step()。它被构造后传给 executor/model runner,然后作为上下文参与 update_from_output()。下一轮会重新构造新的 SchedulerOutput

3. Request 是调度的底账

理解 SchedulerOutput 前,需要先看 scheduler 侧 Request 维护了哪些最基础的计数。

def __init__(
    self,
    request_id: str,
    prompt_token_ids: list[int] | None,
    sampling_params: SamplingParams | None,
    pooling_params: PoolingParams | None,
    ...
) -> None:
    ...
    self.prompt_token_ids = prompt_token_ids
    self.prompt_embeds = prompt_embeds
    ...
    self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
        prompt_token_ids, prompt_embeds
    )
    self._output_token_ids: list[int] = []
    self._all_token_ids: list[int] = (
        self.prompt_token_ids.copy()
        if self.prompt_token_ids is not None
        else [0] * self.num_prompt_tokens
    )

    # Used in async scheduling.
    self.num_output_placeholders = 0
    ...
    self.spec_token_ids: list[int] = []
    self.num_computed_tokens = 0
    ...

几个核心含义:

  • prompt_token_ids: 原始 prompt。
  • _output_token_ids: 已经采样并确认要返回给用户的输出 token。
  • _all_token_ids: prompt + output,是 scheduler 眼中的完整上下文。
  • num_computed_tokens: 已经完成模型计算并写入 KV cache 的 token 数。这个计数不是“已经采样的 token 数”,而是“已经把哪些 token 作为输入跑过 forward”。

生成 token 后,scheduler 通过 append_output_token_ids() 同时更新 output 与 all tokens:

def append_output_token_ids(
    self,
    token_ids: int | list[int],
) -> None:
    ...
    if isinstance(token_ids, int):
        self._output_token_ids.append(token_ids)
        self._all_token_ids.append(token_ids)
    else:
        self._output_token_ids.extend(token_ids)
        self._all_token_ids.extend(token_ids)

    self.update_block_hashes()
    ...

基础 text-only decode 的关键节奏是:

prefill forward(prompt) -> sample token_0
decode forward(token_0) -> sample token_1
decode forward(token_1) -> sample token_2
...

因此刚采样出的 token 会先进入 _all_token_ids,下一轮再作为模型输入被计算。

4. SchedulerOutput 的数据结构

SchedulerOutput 由两个子结构和一组 batch 级字段组成。text-only 基础路径主要关注 NewRequestDataCachedRequestDatanum_scheduled_tokenstotal_num_scheduled_tokensfinished_req_ids

4.1 NewRequestData: 首次发给 worker 的完整请求信息

新请求第一次被调度时,worker 侧还没有缓存它的 prompt、sampling params、block table 等状态,所以需要发送完整信息。

@dataclass
class NewRequestData:
    req_id: str
    prompt_token_ids: list[int] | None
    mm_features: list[MultiModalFeatureSpec]
    sampling_params: SamplingParams | None
    pooling_params: PoolingParams | None
    block_ids: tuple[list[int], ...]
    num_computed_tokens: int
    lora_request: LoRARequest | None
    prompt_embeds: "torch.Tensor | None" = None

    # Only used for v2 model runner.
    prefill_token_ids: list[int] | None = None

    @classmethod
    def from_request(
        cls,
        request: Request,
        block_ids: tuple[list[int], ...],
        prefill_token_ids: list[int] | None = None,
    ) -> "NewRequestData":
        return cls(
            req_id=request.request_id,
            prompt_token_ids=request.prompt_token_ids,
            mm_features=request.mm_features,
            sampling_params=request.sampling_params,
            pooling_params=request.pooling_params,
            block_ids=block_ids,
            num_computed_tokens=request.num_computed_tokens,
            lora_request=request.lora_request,
            prompt_embeds=request.prompt_embeds,
            prefill_token_ids=prefill_token_ids,
        )
    ...

基础 text-only 中可以把它理解为:

NewRequestData = {
  req_id,
  prompt_token_ids,
  sampling_params,
  block_ids,
  num_computed_tokens
}

其中 num_computed_tokens 可能是 0,也可能来自 prefix cache 命中。它表示本轮 worker 准备输入时的起始位置。

4.2 CachedRequestData: 已缓存请求的增量信息

请求被 worker 见过一次后,worker 会缓存 prompt、sampling params、已生成 token、block table 等状态。后续 iteration 只需要发送增量。

@dataclass
class CachedRequestData:
    req_ids: list[str]
    # For request ids not in resumed_req_ids, new_block_ids will be appended to
    # the request's block IDs. For those in the set, new_block_ids will be used as the
    # request's block IDs instead of appending to the existing block IDs.
    resumed_req_ids: set[str]
    # NOTE(woosuk): new_token_ids is only used for pipeline parallelism.
    # When PP is not used, new_token_ids will be empty.
    new_token_ids: list[list[int]]
    # For requests not scheduled in the last step, propagate the token ids to the
    # connector. Won't contain requests that were scheduled in the prior step.
    all_token_ids: dict[str, list[int]]
    new_block_ids: list[tuple[list[int], ...] | None]
    num_computed_tokens: list[int]
    num_output_tokens: list[int]
    ...

这些 list 是按 req_ids 对齐的平行数组:

req_ids[i]
new_block_ids[i]
num_computed_tokens[i]
num_output_tokens[i]

在基础 text-only、单机、无 pipeline 的常见 decode 中,核心是:

  • req_ids: 本轮被调度的老请求。
  • new_block_ids: 如果本轮 KV cache 需要新 block,则 worker 追加到 block table;如果仍在已有 block 内 decode,可能是 None
  • num_computed_tokens: 本轮 forward 的起始 token 位置。
  • num_output_tokens: worker 侧 output token cache 的目标长度,用于对齐/裁剪缓存状态。

4.3 SchedulerOutput: 一轮 batch 的执行计划

@dataclass
class SchedulerOutput:
    # list of the requests that are scheduled for the first time.
    # We cache the request's data in each worker process, so that we don't
    # need to re-send it every scheduling step.
    scheduled_new_reqs: list[NewRequestData]
    # list of the requests that have been scheduled before.
    # Since the request's data is already cached in the worker processes,
    # we only send the diff to minimize the communication cost.
    scheduled_cached_reqs: CachedRequestData

    # req_id -> num_scheduled_tokens
    # Number of tokens scheduled for each request.
    num_scheduled_tokens: dict[str, int]
    # Total number of tokens scheduled for all requests.
    # Equal to sum(num_scheduled_tokens.values())
    total_num_scheduled_tokens: int
    ...
    # Request IDs that are finished in between the previous and the current
    # steps. This is used to notify the workers about the finished requests
    # so that they can free the cached states for those requests.
    finished_req_ids: set[str]
    ...

num_scheduled_tokens 是最核心的 batch 计划。它刻画“每个请求本轮 forward 几个 token”:

  • 新请求 prefill: 通常是 prompt 长度,开启 chunked prefill 时可能是 prompt 的一段。
  • 已完成 prefill 的 decode 请求: 通常是 1。
  • prefix cache 命中时: 是未命中尾部需要实际计算的 token 数。

total_num_scheduled_tokens 是所有请求的总 token 数,用于判断本轮是否真的执行模型、预分配输入 buffer、选择 CUDA graph/padding 等。

5. schedule(): 如何生成 SchedulerOutput

schedule() 的核心算法不是显式区分 prefill/decode,而是让每个 request 的 num_computed_tokens 追上 num_tokens_with_spec。基础 text-only 可以把 num_tokens_with_spec 视为 len(prompt + output)

源码中的注释已经说明了这个统一模型:

def schedule(self) -> SchedulerOutput:
    ...
    # NOTE(woosuk) on the scheduling algorithm:
    # There's no "decoding phase" nor "prefill phase" in the scheduler.
    # Each request just has the num_computed_tokens and
    # num_tokens_with_spec. num_tokens_with_spec =
    # len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
    # At each step, the scheduler tries to assign tokens to the requests
    # so that each request's num_computed_tokens can catch up its
    # num_tokens_with_spec. This is general enough to cover
    # chunked prefills, prefix caching, speculative decoding,
    # and the "jump decoding" optimization in the future.

    scheduled_new_reqs: list[Request] = []
    scheduled_resumed_reqs: list[Request] = []
    scheduled_running_reqs: list[Request] = []
    preempted_reqs: list[Request] = []

    req_to_new_blocks: dict[str, KVCacheBlocks] = {}
    num_scheduled_tokens: dict[str, int] = {}
    token_budget = self.max_num_scheduled_tokens
    ...

5.1 先调度 RUNNING 请求

RUNNING 请求已经在 scheduler 和 worker 两边存在状态。本轮要处理的 token 数来自“总 token 数减去已计算 token 数”。

def schedule(self) -> SchedulerOutput:
    ...
    # First, schedule the RUNNING requests.
    req_index = 0
    while req_index < len(self.running) and token_budget > 0:
        request = self.running[req_index]
        ...
        num_new_tokens = (
            request.num_tokens_with_spec
            + request.num_output_placeholders
            - request.num_computed_tokens
        )
        if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
            num_new_tokens = self.scheduler_config.long_prefill_token_threshold
        num_new_tokens = min(num_new_tokens, token_budget)

        # Make sure the input position does not exceed the max model len.
        # This is necessary when using spec decoding.
        num_new_tokens = min(
            num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
        )
        ...

例如 prompt 长度 5,prefill 后采样出第一个输出 token t0

num_tokens_with_spec = len(prompt + [t0]) = 6
num_computed_tokens  = 5
num_new_tokens       = 1

这一轮就把 t0 作为输入跑一次 decode forward。

调度成功后,scheduler 分配 KV slots,记录 block 增量和 token 数:

def schedule(self) -> SchedulerOutput:
    ...
    # Schedule the request.
    scheduled_running_reqs.append(request)
    request_id = request.request_id
    req_to_new_blocks[request_id] = new_blocks
    num_scheduled_tokens[request_id] = num_new_tokens
    token_budget -= num_new_tokens
    req_index += 1
    ...

5.2 再调度 WAITING 请求

WAITING 请求第一次进入运行队列时,会先查 prefix cache,然后计算本轮实际需要处理多少 token。

def schedule(self) -> SchedulerOutput:
    ...
    # Get already-cached tokens.
    if request.num_computed_tokens == 0:
        # Get locally-cached tokens.
        new_computed_blocks, num_new_local_computed_tokens = (
            self.kv_cache_manager.get_computed_blocks(request)
        )
        ...
        # Total computed tokens (local + external).
        num_computed_tokens = (
            num_new_local_computed_tokens + num_external_computed_tokens
        )
        assert num_computed_tokens <= request.num_tokens
    else:
        # KVTransfer: WAITING reqs have num_computed_tokens > 0
        # after async KV recvs are completed.
        new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
        num_new_local_computed_tokens = 0
        num_computed_tokens = request.num_computed_tokens
    ...

基础无 prefix cache 命中时,num_computed_tokens = 0。随后本轮 token 数就是完整 prompt 长度,或 chunked prefill 的一段:

def schedule(self) -> SchedulerOutput:
    ...
    # Number of tokens to be scheduled.
    # We use `request.num_tokens` instead of
    # `request.num_prompt_tokens` to consider the resumed
    # requests, which have output tokens.
    num_new_tokens = request.num_tokens - num_computed_tokens
    threshold = self.scheduler_config.long_prefill_token_threshold
    if 0 < threshold < num_new_tokens:
        num_new_tokens = threshold
    ...
    num_new_tokens = min(num_new_tokens, token_budget)
    assert num_new_tokens > 0
    ...

调度成功后,请求进入 running,并被标记为新请求或恢复请求。普通首次 text-only 请求会进入 scheduled_new_reqs

def schedule(self) -> SchedulerOutput:
    ...
    self.running.append(request)
    if self.log_stats:
        request.record_event(
            EngineCoreEventType.SCHEDULED, scheduled_timestamp
        )
    if request.status == RequestStatus.WAITING:
        scheduled_new_reqs.append(request)
    elif request.status == RequestStatus.PREEMPTED:
        scheduled_resumed_reqs.append(request)
    else:
        raise RuntimeError(f"Invalid request status: {request.status}")

    if self.lora_config and request.lora_request:
        scheduled_loras.add(request.lora_request.lora_int_id)
    req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(
        request_id
    )
    num_scheduled_tokens[request_id] = num_new_tokens
    token_budget -= num_new_tokens
    request.status = RequestStatus.RUNNING
    request.num_computed_tokens = num_computed_tokens
    ...

注意这里 request.num_computed_tokens 先被设为 prefix cache 命中的长度,还没有加上本轮即将计算的 num_new_tokens

5.3 KV block IDs 的来源

KV block 分配由 KVCacheManager.allocate_slots() 完成。SchedulerOutput 中的 block_idsnew_block_ids 都来自这里。

def allocate_slots(
    self,
    request: Request,
    num_new_tokens: int,
    num_new_computed_tokens: int = 0,
    new_computed_blocks: KVCacheBlocks | None = None,
    num_lookahead_tokens: int = 0,
    num_external_computed_tokens: int = 0,
    delay_cache_blocks: bool = False,
    num_encoder_tokens: int = 0,
) -> KVCacheBlocks | None:
    ...
    if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
        # Cannot allocate new blocks
        return None
    ...
    new_blocks = self.coordinator.allocate_new_blocks(
        request.request_id,
        num_tokens_need_slot,
        num_tokens_main_model,
        num_encoder_tokens,
    )
    ...
    return self.create_kv_cache_blocks(new_blocks)
    ...

KVCacheBlocks 再被转换成实际传输给 worker 的 block id:

def get_block_ids(
    self,
    allow_none: bool = False,
) -> tuple[list[int], ...] | None:
    ...
    if allow_none and all(len(group) == 0 for group in self.blocks):
        return None
    return tuple([blk.block_id for blk in group] for group in self.blocks)
    ...

对于新请求,worker 需要完整 block table,因此使用当前请求完整 blocks。对于已在 worker persistent batch 中的请求,通常只发送新分配 blocks,让 worker 追加即可。

5.4 构造 SchedulerOutput

调度完成后,scheduler 把 Request 列表转换成传输结构。

def schedule(self) -> SchedulerOutput:
    ...
    new_reqs_data = [
        NewRequestData.from_request(
            req, req_to_new_blocks[req.request_id].get_block_ids()
        )
        for req in scheduled_new_reqs
    ]

    with record_function_or_nullcontext("schedule: make_cached_request_data"):
        cached_reqs_data = self._make_cached_request_data(
            scheduled_running_reqs,
            scheduled_resumed_reqs,
            num_scheduled_tokens,
            scheduled_spec_decode_tokens,
            req_to_new_blocks,
        )

    # Record the request ids that were scheduled in this step.
    self.prev_step_scheduled_req_ids.clear()
    self.prev_step_scheduled_req_ids.update(num_scheduled_tokens.keys())
    ...

cached 请求的增量数据在 _make_cached_request_data() 中生成:

def _make_cached_request_data(
    self,
    running_reqs: list[Request],
    resumed_reqs: list[Request],
    num_scheduled_tokens: dict[str, int],
    spec_decode_tokens: dict[str, list[int]],
    req_to_new_blocks: dict[str, KVCacheBlocks],
) -> CachedRequestData:
    ...
    for idx, req in enumerate(itertools.chain(running_reqs, resumed_reqs)):
        req_id = req.request_id
        req_ids.append(req_id)
        ...
        scheduled_in_prev_step = req_id in self.prev_step_scheduled_req_ids
        if idx >= num_running_reqs:
            assert not scheduled_in_prev_step
            resumed_req_ids.add(req_id)
        if not scheduled_in_prev_step:
            all_token_ids[req_id] = req.all_token_ids.copy()
        new_block_ids.append(
            req_to_new_blocks[req_id].get_block_ids(allow_none=True)
        )
        num_computed_tokens.append(req.num_computed_tokens)
        num_output_tokens.append(
            req.num_output_tokens + req.num_output_placeholders
        )

    return CachedRequestData(
        req_ids=req_ids,
        resumed_req_ids=resumed_req_ids,
        new_token_ids=new_token_ids,
        all_token_ids=all_token_ids,
        new_block_ids=new_block_ids,
        num_computed_tokens=num_computed_tokens,
        num_output_tokens=num_output_tokens,
    )
    ...

随后真正创建 SchedulerOutput

def schedule(self) -> SchedulerOutput:
    ...
    scheduler_output = SchedulerOutput(
        scheduled_new_reqs=new_reqs_data,
        scheduled_cached_reqs=cached_reqs_data,
        num_scheduled_tokens=num_scheduled_tokens,
        total_num_scheduled_tokens=total_num_scheduled_tokens,
        scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
        scheduled_encoder_inputs=scheduled_encoder_inputs,
        num_common_prefix_blocks=num_common_prefix_blocks,
        preempted_req_ids={req.request_id for req in preempted_reqs},
        # finished_req_ids is an existing state in the scheduler,
        # instead of being newly scheduled in this step.
        # It contains the request IDs that are finished in between
        # the previous and the current steps.
        finished_req_ids=self.finished_req_ids,
        free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
        new_block_ids_to_zero=new_block_ids_to_zero,
    )
    ...

这里有一个非常关键的时间点:SchedulerOutput 里的 num_computed_tokens 是本轮 forward 开始前的位置;但 schedule() 返回前,scheduler 自己会立刻把内部 Request.num_computed_tokens 往前推进。

def schedule(self) -> SchedulerOutput:
    ...
    with record_function_or_nullcontext("schedule: update_after_schedule"):
        self._update_after_schedule(scheduler_output)
    return scheduler_output
    ...
def _update_after_schedule(self, scheduler_output: SchedulerOutput) -> None:
    ...
    num_scheduled_tokens = scheduler_output.num_scheduled_tokens
    for req_id, num_scheduled_token in num_scheduled_tokens.items():
        request = self.requests[req_id]
        request.num_computed_tokens += num_scheduled_token
        request.is_prefill_chunk = request.num_computed_tokens < (
            request.num_tokens + request.num_output_placeholders
        )
        ...

    # Clear the finished request IDs.
    # NOTE: We shouldn't do self.finished_req_ids.clear() here because
    # it will also affect the scheduler output.
    self.finished_req_ids = set()
    ...

这形成了一个重要不变量:

SchedulerOutput 中的 num_computed_tokens:
  worker 本轮准备输入时使用的起始位置

Scheduler 内部 Request.num_computed_tokens:
  schedule 返回前已经乐观推进到本轮 forward 之后的位置

基础 text-only 中不会有 speculative rejection,所以这个乐观推进通常不用回退。

6. worker 如何消费 SchedulerOutput

worker/model runner 侧会先把 SchedulerOutput 应用到自己的缓存状态,再据此构造本轮 GPU 输入。

6.1 清理已经完成的请求

finished_req_ids 是上一次或期间完成的请求集合。本轮 schedule 会把它带给 worker,worker 据此移除 cached state 和 persistent batch 中的请求。

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    # Remove finished requests from the cached states.
    for req_id in scheduler_output.finished_req_ids:
        self.requests.pop(req_id, None)
        self.num_prompt_logprobs.pop(req_id, None)
    self.late_interaction_runner.on_requests_finished(
        scheduler_output.finished_req_ids
    )
    ...
    for req_id in scheduler_output.finished_req_ids:
        self.input_batch.remove_request(req_id)
    ...

这解释了为什么 SchedulerOutput 可能出现 total_num_scheduled_tokens == 0 但仍有意义:它可能只是一个 cleanup-only output,用来通知 worker 删除已完成请求的缓存。

6.2 添加新请求

新请求会被转换成 worker 侧的 CachedRequestState。这份状态后续留在 worker 内部,不需要 scheduler 每轮重发完整 prompt。

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    # Add new requests to the cached states.
    for new_req_data in scheduler_output.scheduled_new_reqs:
        req_id = new_req_data.req_id
        ...
        req_state = CachedRequestState(
            req_id=req_id,
            prompt_token_ids=new_req_data.prompt_token_ids,
            prompt_embeds=new_req_data.prompt_embeds,
            mm_features=new_req_data.mm_features,
            sampling_params=sampling_params,
            pooling_params=pooling_params,
            generator=generator,
            block_ids=new_req_data.block_ids,
            num_computed_tokens=new_req_data.num_computed_tokens,
            output_token_ids=[],
            lora_request=new_req_data.lora_request,
        )
        self.requests[req_id] = req_state
        ...
        reqs_to_add.append(req_state)
    ...

CachedRequestState 的基础字段如下:

@dataclass
class CachedRequestState:
    req_id: str
    prompt_token_ids: list[int] | None
    mm_features: list[MultiModalFeatureSpec]
    sampling_params: SamplingParams | None
    generator: torch.Generator | None

    block_ids: tuple[list[int], ...]
    num_computed_tokens: int
    output_token_ids: list[int]
    ...

6.3 更新已缓存请求

老请求不重发完整 prompt,而是更新起始位置、block table、必要的 token 对齐信息。

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    req_data = scheduler_output.scheduled_cached_reqs
    scheduled_spec_tokens = scheduler_output.scheduled_spec_decode_tokens
    ...
    for i, req_id in enumerate(req_data.req_ids):
        req_state = self.requests[req_id]
        num_computed_tokens = req_data.num_computed_tokens[i]
        new_block_ids = req_data.new_block_ids[i]
        resumed_from_preemption = req_id in req_data.resumed_req_ids
        num_output_tokens = req_data.num_output_tokens[i]
        req_index = self.input_batch.req_id_to_index.get(req_id)
        ...
        # Update the cached states.
        req_state.num_computed_tokens = num_computed_tokens
        ...
        if not resumed_from_preemption:
            if new_block_ids is not None:
                # Append the new blocks to the existing block IDs.
                for block_ids, new_ids in zip(req_state.block_ids, new_block_ids):
                    block_ids.extend(new_ids)
        else:
            assert req_index is None
            assert new_block_ids is not None
            # The request is resumed from preemption.
            # Replace the existing block IDs with the new ones.
            req_state.block_ids = new_block_ids
        ...

如果请求已经在 persistent batch 里,worker 同步 batch 中的 num_computed_tokens 和 block table:

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    # Update the persistent batch.
    self.input_batch.num_computed_tokens_cpu[req_index] = num_computed_tokens
    if new_block_ids is not None:
        self.input_batch.block_table.append_row(new_block_ids, req_index)
    ...

最后,新请求或被移出 persistent batch 后又调度回来的请求,会被加入 InputBatch

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    # Add the new or resumed requests to the persistent batch.
    # The smaller empty indices are filled first.
    for request in reqs_to_add:
        self.input_batch.add_request(request)
        self.input_batch.update_req_spec_token_ids(request, scheduled_spec_tokens)

    # Condense the batched states if there are gaps left by removed requests
    self.input_batch.condense()
    # Allow attention backend to reorder the batch, potentially
    self._may_reorder_batch(scheduler_output)
    # Refresh batch metadata with any pending updates.
    self.input_batch.refresh_metadata()
    ...

InputBatch.add_request() 会把 prompt token、已有 output token、起始 computed 位置和 block table 都放入 worker 的 batch 缓存:

def add_request(
    self,
    request: "CachedRequestState",
) -> int:
    ...
    # Copy the prompt token ids and output token ids.
    num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
        request.prompt_token_ids, request.prompt_embeds
    )
    self.num_prompt_tokens[req_index] = num_prompt_tokens
    start_idx = num_prompt_tokens
    end_idx = start_idx + len(request.output_token_ids)
    if request.prompt_token_ids is not None:
        self.token_ids_cpu[req_index, :num_prompt_tokens] = request.prompt_token_ids
        self.is_token_ids[req_index, :num_prompt_tokens] = True
    else:
        self.is_token_ids[req_index, :num_prompt_tokens] = False
    ...
    self.token_ids_cpu[req_index, start_idx:end_idx] = request.output_token_ids
    self.is_token_ids[req_index, start_idx:end_idx] = True
    # Number of tokens without spec decode tokens.
    self.num_tokens_no_spec[req_index] = request.num_tokens

    self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
    self.block_table.add_row(request.block_ids, req_index)
    ...

6.4 准备本轮 input_ids 和 positions

SchedulerOutput.num_scheduled_tokens 决定本轮每个请求取多少 token;CachedRequestState/InputBatch.num_computed_tokens 决定从哪个位置开始取。

def _prepare_inputs(
    self,
    scheduler_output: "SchedulerOutput",
    num_scheduled_tokens: np.ndarray,
) -> tuple[
    torch.Tensor,
    SpecDecodeMetadata | None,
]:
    ...
    total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
    assert total_num_scheduled_tokens > 0
    num_reqs = self.input_batch.num_reqs
    assert num_reqs > 0
    ...
    # Get request indices.
    # E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2]
    req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)

    # cu_num_tokens: [2, 5, 3] -> [2, 7, 10]
    # self.query_pos.np[:10]: [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
    cu_num_tokens = self._get_cumsum_and_arange(
        num_scheduled_tokens, self.query_pos.np
    )

    # Get positions.
    positions_np = (
        self.input_batch.num_computed_tokens_cpu[req_indices]
        + self.query_pos.np[: cu_num_tokens[-1]]
    )
    ...

随后用这些 position 从 worker 侧 token cache 中取出真正送入模型的 token:

def _prepare_inputs(
    self,
    scheduler_output: "SchedulerOutput",
    num_scheduled_tokens: np.ndarray,
) -> tuple[
    torch.Tensor,
    SpecDecodeMetadata | None,
]:
    ...
    token_indices = (
        positions_np + req_indices * self.input_batch.token_ids_cpu.shape[1]
    )
    token_indices_tensor = torch.from_numpy(token_indices)

    # NOTE(woosuk): We use torch.index_select instead of np.take here
    # because torch.index_select is much faster than np.take for large
    # tensors.
    torch.index_select(
        self.input_batch.token_ids_cpu_tensor.flatten(),
        0,
        token_indices_tensor,
        out=self.input_ids.cpu[:total_num_scheduled_tokens],
    )
    ...

这一段把 SchedulerOutput 的几个字段连起来了:

num_scheduled_tokens[req]  -> 本轮取几个 token
num_computed_tokens[req]   -> 从哪个 token position 开始取
block_ids/new_block_ids    -> 这些 token 的 KV 写到哪些 block

7. 模型输出如何回写

worker 采样后会构造 ModelRunnerOutput。基础 text-only 主要看 req_idsreq_id_to_indexsampled_token_ids

@dataclass
class ModelRunnerOutput:
    # [num_reqs]
    req_ids: list[str]
    # req_id -> index
    req_id_to_index: dict[str, int]

    # num_reqs x num_generated_tokens
    # num_generated_tokens is the number of tokens
    # generated in the current step. It can be different for
    # each request due to speculative/jump decoding.
    sampled_token_ids: list[list[int]] = field(default_factory=list)
    ...

采样出的 token 会先在 worker 侧缓存。这样下一轮 decode 准备 input_ids 时,不需要 scheduler 把刚采样的 token 再传回 worker。

def _bookkeeping_sync(
    self,
    scheduler_output: "SchedulerOutput",
    sampler_output: SamplerOutput,
    logits: torch.Tensor | None,
    hidden_states: torch.Tensor,
    num_scheduled_tokens: int,
    spec_decode_metadata: SpecDecodeMetadata | None,
) -> tuple[
    dict[str, int],
    LogprobsLists | None,
    list[list[int]],
    dict[str, LogprobsTensors | None],
    list[str],
    dict[str, int],
    list[int],
]:
    ...
    # Cache the sampled tokens in the model runner, so that the scheduler
    # doesn't need to send them back.
    # NOTE(woosuk): As an exception, when using PP, the scheduler sends
    # the sampled tokens back, because there's no direct communication
    # between the first-stage worker and the last-stage worker.
    req_ids = self.input_batch.req_ids
    for req_idx in range(num_sampled_tokens):
        ...
        self.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = sampled_ids
        self.input_batch.is_token_ids[req_idx, start_idx:end_idx] = True
        self.input_batch.num_tokens_no_spec[req_idx] = end_idx

        req_id = req_ids[req_idx]
        req_state = self.requests[req_id]
        req_state.output_token_ids.extend(sampled_ids)
    ...

然后 scheduler 侧用同一个 SchedulerOutput 作为上下文来解释 ModelRunnerOutput

def update_from_output(
    self,
    scheduler_output: SchedulerOutput,
    model_runner_output: ModelRunnerOutput,
) -> dict[int, EngineCoreOutputs]:
    ...
    sampled_token_ids = model_runner_output.sampled_token_ids
    ...
    num_scheduled_tokens = scheduler_output.num_scheduled_tokens
    ...
    for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
        assert num_tokens_scheduled > 0
        ...
        request = self.requests.get(req_id)
        if request is None or request.is_finished():
            ...
            continue

        req_index = model_runner_output.req_id_to_index[req_id]
        generated_token_ids = (
            sampled_token_ids[req_index] if sampled_token_ids else []
        )
        ...

如果这一轮生成了新 token,scheduler 将它追加到 Request,并检查是否停止:

def update_from_output(
    self,
    scheduler_output: SchedulerOutput,
    model_runner_output: ModelRunnerOutput,
) -> dict[int, EngineCoreOutputs]:
    ...
    # Check for stop and update request status.
    if new_token_ids:
        new_token_ids, stopped = self._update_request_with_output(
            request, new_token_ids
        )
    ...
def _update_request_with_output(
    self, request: Request, new_token_ids: list[int]
) -> tuple[list[int], bool]:
    # Append generated tokens and check for stop. Note that if
    # a request is still being prefilled, we expect the model runner
    # to return empty token ids for the request.
    stopped = False
    for num_new, output_token_id in enumerate(new_token_ids, 1):
        request.append_output_token_ids(output_token_id)

        # Check for stop and update request state.
        # This must be called before we make the EngineCoreOutput.
        stopped = check_stop(request, self.max_model_len)
        if stopped:
            del new_token_ids[num_new:]  # Trim new tokens if needed.
            break
    return new_token_ids, stopped

如果请求结束,scheduler 会记录 finished_req_ids,释放 scheduler 侧 KV blocks,并在下一次 schedule() 中通知 worker 清理 worker 侧状态。

def _free_request(
    self, request: Request, delay_free_blocks: bool = False
) -> dict[str, Any] | None:
    assert request.is_finished()

    connector_delay_free_blocks, kv_xfer_params = self._connector_finished(request)
    self.encoder_cache_manager.free(request)
    request_id = request.request_id
    self.finished_req_ids.add(request_id)
    if self.finished_req_ids_dict is not None:
        self.finished_req_ids_dict[request.client_index].add(request_id)

    delay_free_blocks |= connector_delay_free_blocks
    if not delay_free_blocks:
        self._free_blocks(request)

    return kv_xfer_params
    ...

8. 单请求 text-only 时间线

假设一个请求 r1,prompt 长度 5,无 prefix cache 命中,每轮 decode 1 个 token。

初始:
  Request._all_token_ids      = [p0 p1 p2 p3 p4]
  Request._output_token_ids   = []
  Request.num_computed_tokens = 0
  status = WAITING

Step 0: prefill
  schedule:
    num_computed_tokens snapshot = 0
    num_scheduled_tokens[r1] = 5
    scheduled_new_reqs = [NewRequestData(r1, prompt, block_ids, 0)]
    scheduler 内部 num_computed_tokens += 5 -> 5

  worker:
    add CachedRequestState(r1)
    input positions = [0,1,2,3,4]
    input_ids = [p0,p1,p2,p3,p4]
    forward 后 sample t0
    worker token cache 追加 t0

  update_from_output:
    Request.append_output_token_ids(t0)
    _all_token_ids = [p0 p1 p2 p3 p4 t0]
    _output_token_ids = [t0]

Step 1: decode t0
  schedule:
    num_tokens = 6
    num_computed_tokens = 5
    num_scheduled_tokens[r1] = 1
    scheduled_cached_reqs.req_ids = [r1]
    scheduled_cached_reqs.num_computed_tokens = [5]
    scheduler 内部 num_computed_tokens += 1 -> 6

  worker:
    input position = [5]
    input_ids = [t0]
    forward 后 sample t1
    worker token cache 追加 t1

  update_from_output:
    Request.append_output_token_ids(t1)

Step 2:
  同理,输入 t1,采样 t2。

这个时间线说明了 num_computed_tokens 的语义:它指向“下一个要作为输入计算的 token 位置”。采样 token 出现后,会先进入 token history;下一轮被 schedule 出来并真正计算 KV。

9. 字段来源、用途、更新时间速查

字段来源worker/model runner 用途更新时间
scheduled_new_reqsschedule() 中从 WAITING 请求转换而来创建 worker 侧 CachedRequestState,初始化 prompt、sampling params、block table每次 schedule 新建;请求只在首次调度时出现
scheduled_cached_reqs.req_ids本轮被调度的 RUNNING/恢复请求定位 worker 已缓存请求每次 schedule 新建
scheduled_cached_reqs.new_block_idsKVCacheManager.allocate_slots() / get_blocks()追加或替换 worker block table调度分配 KV slots 时产生
scheduled_cached_reqs.num_computed_tokensscheduler 侧 Request.num_computed_tokens 的本轮起始快照设置 input 起始 position构造 SchedulerOutput 前采样;随后 scheduler 内部会推进
scheduled_cached_reqs.num_output_tokensscheduler 侧 output token 数对齐 worker 侧 output token cache构造 cached request data 时采样
num_scheduled_tokensscheduler 根据 num_tokens - num_computed_tokens 和 token budget 计算决定每个请求本轮 forward token 数本轮 schedule 内生成
total_num_scheduled_tokenssum(num_scheduled_tokens.values())判断是否执行 forward,准备总输入 buffer本轮 schedule 内生成
finished_req_ids_free_request() 或外部 abort/finish 流程写入 schedulerworker 删除 cached state 和 persistent batch 条目在下一次 schedule 中随 output 发出,随后 scheduler 侧重置为空 set
new_block_ids_to_zeroKV block pool 本轮新分配 blockworker 在使用前清零对应 GPU KV memory仅在需要 KV cache zeroing 时非空

10. 机制总结

SchedulerOutput 解决的是 scheduler 与 model runner 的状态分工问题:

  • scheduler 负责队列、预算、KV block 分配、请求完成与停止条件。
  • worker 负责把请求状态缓存成高效的 GPU 输入批,包括 token cache、block table、采样参数和 persistent batch。
  • SchedulerOutput 是每轮同步二者的最小计划:新请求给全量,老请求给增量,再加上每个请求本轮该跑多少 token。

最容易混淆的是 num_computed_tokens 与 generated token 的关系。generated token 在本轮采样产生,但它的 KV 还没有被计算;下一轮 scheduler 会看到 len(prompt + output) 变长,于是把这个新 token 调度为输入。这个“采样先进入历史、下一轮再计算”的节奏,正是 text-only 自回归 decode 在 SchedulerOutput 中的核心数据流。