vLLM框架代码走读03(SchedulerOutput)
(本篇是03。01 Client篇和 02 Core 篇后面再补上来)
本文聚焦 vLLM V1 中最基础的 text-only 生成路径。多模态 encoder、speculative decoding、pipeline/data parallel、KV/EC connector、structured output、pooling 等扩展点只在字段存在性上点到为止,不展开机制。
1. 一句话定位
SchedulerOutput 是一次 engine iteration 的调度结果。它把 scheduler 对“本轮 forward 要处理哪些请求、每个请求从哪个 token 位置开始处理、处理多少 token、使用哪些 KV cache block、哪些 worker 侧缓存需要清理”的决策,打包传给 model runner。
它不是长期状态本身。长期状态分别放在:
- scheduler 侧的
Request、self.running、self.waiting、KVCacheManager - worker/model runner 侧的
CachedRequestState、InputBatch、block table、token cache
SchedulerOutput 更像二者之间的一帧执行计划。
Scheduler state Worker state
Request / queues / KV blocks CachedRequestState / InputBatch
\ /
\-- one iteration plan ------/
SchedulerOutput
2. 主循环中的生命周期
基础同步路径在 EngineCore.step() 里非常直接:先调度,再执行模型,再把模型输出回写 scheduler。
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
...
if not self.scheduler.has_requests():
return {}, False
scheduler_output = self.scheduler.schedule()
future = self.model_executor.execute_model(scheduler_output, non_block=True)
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
):
model_output = future.result()
if model_output is None:
model_output = self.model_executor.sample_tokens(grammar_output)
# Before processing the model output, process any aborts that happened
# during the model execution.
self._process_aborts_queue()
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)
return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0
...
对应时序可以压缩成:
@startuml
participant EngineCore
participant Scheduler
participant Executor
participant GPUModelRunner
EngineCore -> Scheduler: schedule()
Scheduler --> EngineCore: SchedulerOutput
EngineCore -> Executor: execute_model(SchedulerOutput)
Executor -> GPUModelRunner: execute_model(...)
GPUModelRunner -> GPUModelRunner: update worker cache / prepare input / forward
GPUModelRunner --> Executor: None or ModelRunnerOutput
EngineCore -> Executor: sample_tokens(...) if needed
Executor -> GPUModelRunner: sample_tokens(...)
GPUModelRunner --> EngineCore: ModelRunnerOutput
EngineCore -> Scheduler: update_from_output(SchedulerOutput, ModelRunnerOutput)
Scheduler --> EngineCore: EngineCoreOutputs
@enduml
在无 pipeline batch queue 的普通路径里,SchedulerOutput 的生命期就是这一轮 step()。它被构造后传给 executor/model runner,然后作为上下文参与 update_from_output()。下一轮会重新构造新的 SchedulerOutput。
3. Request 是调度的底账
理解 SchedulerOutput 前,需要先看 scheduler 侧 Request 维护了哪些最基础的计数。
def __init__(
self,
request_id: str,
prompt_token_ids: list[int] | None,
sampling_params: SamplingParams | None,
pooling_params: PoolingParams | None,
...
) -> None:
...
self.prompt_token_ids = prompt_token_ids
self.prompt_embeds = prompt_embeds
...
self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
prompt_token_ids, prompt_embeds
)
self._output_token_ids: list[int] = []
self._all_token_ids: list[int] = (
self.prompt_token_ids.copy()
if self.prompt_token_ids is not None
else [0] * self.num_prompt_tokens
)
# Used in async scheduling.
self.num_output_placeholders = 0
...
self.spec_token_ids: list[int] = []
self.num_computed_tokens = 0
...
几个核心含义:
prompt_token_ids: 原始 prompt。_output_token_ids: 已经采样并确认要返回给用户的输出 token。_all_token_ids: prompt + output,是 scheduler 眼中的完整上下文。num_computed_tokens: 已经完成模型计算并写入 KV cache 的 token 数。这个计数不是“已经采样的 token 数”,而是“已经把哪些 token 作为输入跑过 forward”。
生成 token 后,scheduler 通过 append_output_token_ids() 同时更新 output 与 all tokens:
def append_output_token_ids(
self,
token_ids: int | list[int],
) -> None:
...
if isinstance(token_ids, int):
self._output_token_ids.append(token_ids)
self._all_token_ids.append(token_ids)
else:
self._output_token_ids.extend(token_ids)
self._all_token_ids.extend(token_ids)
self.update_block_hashes()
...
基础 text-only decode 的关键节奏是:
prefill forward(prompt) -> sample token_0
decode forward(token_0) -> sample token_1
decode forward(token_1) -> sample token_2
...
因此刚采样出的 token 会先进入 _all_token_ids,下一轮再作为模型输入被计算。
4. SchedulerOutput 的数据结构
SchedulerOutput 由两个子结构和一组 batch 级字段组成。text-only 基础路径主要关注 NewRequestData、CachedRequestData、num_scheduled_tokens、total_num_scheduled_tokens、finished_req_ids。
4.1 NewRequestData: 首次发给 worker 的完整请求信息
新请求第一次被调度时,worker 侧还没有缓存它的 prompt、sampling params、block table 等状态,所以需要发送完整信息。
@dataclass
class NewRequestData:
req_id: str
prompt_token_ids: list[int] | None
mm_features: list[MultiModalFeatureSpec]
sampling_params: SamplingParams | None
pooling_params: PoolingParams | None
block_ids: tuple[list[int], ...]
num_computed_tokens: int
lora_request: LoRARequest | None
prompt_embeds: "torch.Tensor | None" = None
# Only used for v2 model runner.
prefill_token_ids: list[int] | None = None
@classmethod
def from_request(
cls,
request: Request,
block_ids: tuple[list[int], ...],
prefill_token_ids: list[int] | None = None,
) -> "NewRequestData":
return cls(
req_id=request.request_id,
prompt_token_ids=request.prompt_token_ids,
mm_features=request.mm_features,
sampling_params=request.sampling_params,
pooling_params=request.pooling_params,
block_ids=block_ids,
num_computed_tokens=request.num_computed_tokens,
lora_request=request.lora_request,
prompt_embeds=request.prompt_embeds,
prefill_token_ids=prefill_token_ids,
)
...
基础 text-only 中可以把它理解为:
NewRequestData = {
req_id,
prompt_token_ids,
sampling_params,
block_ids,
num_computed_tokens
}
其中 num_computed_tokens 可能是 0,也可能来自 prefix cache 命中。它表示本轮 worker 准备输入时的起始位置。
4.2 CachedRequestData: 已缓存请求的增量信息
请求被 worker 见过一次后,worker 会缓存 prompt、sampling params、已生成 token、block table 等状态。后续 iteration 只需要发送增量。
@dataclass
class CachedRequestData:
req_ids: list[str]
# For request ids not in resumed_req_ids, new_block_ids will be appended to
# the request's block IDs. For those in the set, new_block_ids will be used as the
# request's block IDs instead of appending to the existing block IDs.
resumed_req_ids: set[str]
# NOTE(woosuk): new_token_ids is only used for pipeline parallelism.
# When PP is not used, new_token_ids will be empty.
new_token_ids: list[list[int]]
# For requests not scheduled in the last step, propagate the token ids to the
# connector. Won't contain requests that were scheduled in the prior step.
all_token_ids: dict[str, list[int]]
new_block_ids: list[tuple[list[int], ...] | None]
num_computed_tokens: list[int]
num_output_tokens: list[int]
...
这些 list 是按 req_ids 对齐的平行数组:
req_ids[i]
new_block_ids[i]
num_computed_tokens[i]
num_output_tokens[i]
在基础 text-only、单机、无 pipeline 的常见 decode 中,核心是:
req_ids: 本轮被调度的老请求。new_block_ids: 如果本轮 KV cache 需要新 block,则 worker 追加到 block table;如果仍在已有 block 内 decode,可能是None。num_computed_tokens: 本轮 forward 的起始 token 位置。num_output_tokens: worker 侧 output token cache 的目标长度,用于对齐/裁剪缓存状态。
4.3 SchedulerOutput: 一轮 batch 的执行计划
@dataclass
class SchedulerOutput:
# list of the requests that are scheduled for the first time.
# We cache the request's data in each worker process, so that we don't
# need to re-send it every scheduling step.
scheduled_new_reqs: list[NewRequestData]
# list of the requests that have been scheduled before.
# Since the request's data is already cached in the worker processes,
# we only send the diff to minimize the communication cost.
scheduled_cached_reqs: CachedRequestData
# req_id -> num_scheduled_tokens
# Number of tokens scheduled for each request.
num_scheduled_tokens: dict[str, int]
# Total number of tokens scheduled for all requests.
# Equal to sum(num_scheduled_tokens.values())
total_num_scheduled_tokens: int
...
# Request IDs that are finished in between the previous and the current
# steps. This is used to notify the workers about the finished requests
# so that they can free the cached states for those requests.
finished_req_ids: set[str]
...
num_scheduled_tokens 是最核心的 batch 计划。它刻画“每个请求本轮 forward 几个 token”:
- 新请求 prefill: 通常是 prompt 长度,开启 chunked prefill 时可能是 prompt 的一段。
- 已完成 prefill 的 decode 请求: 通常是 1。
- prefix cache 命中时: 是未命中尾部需要实际计算的 token 数。
total_num_scheduled_tokens 是所有请求的总 token 数,用于判断本轮是否真的执行模型、预分配输入 buffer、选择 CUDA graph/padding 等。
5. schedule(): 如何生成 SchedulerOutput
schedule() 的核心算法不是显式区分 prefill/decode,而是让每个 request 的 num_computed_tokens 追上 num_tokens_with_spec。基础 text-only 可以把 num_tokens_with_spec 视为 len(prompt + output)。
源码中的注释已经说明了这个统一模型:
def schedule(self) -> SchedulerOutput:
...
# NOTE(woosuk) on the scheduling algorithm:
# There's no "decoding phase" nor "prefill phase" in the scheduler.
# Each request just has the num_computed_tokens and
# num_tokens_with_spec. num_tokens_with_spec =
# len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
# At each step, the scheduler tries to assign tokens to the requests
# so that each request's num_computed_tokens can catch up its
# num_tokens_with_spec. This is general enough to cover
# chunked prefills, prefix caching, speculative decoding,
# and the "jump decoding" optimization in the future.
scheduled_new_reqs: list[Request] = []
scheduled_resumed_reqs: list[Request] = []
scheduled_running_reqs: list[Request] = []
preempted_reqs: list[Request] = []
req_to_new_blocks: dict[str, KVCacheBlocks] = {}
num_scheduled_tokens: dict[str, int] = {}
token_budget = self.max_num_scheduled_tokens
...
5.1 先调度 RUNNING 请求
RUNNING 请求已经在 scheduler 和 worker 两边存在状态。本轮要处理的 token 数来自“总 token 数减去已计算 token 数”。
def schedule(self) -> SchedulerOutput:
...
# First, schedule the RUNNING requests.
req_index = 0
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
...
num_new_tokens = (
request.num_tokens_with_spec
+ request.num_output_placeholders
- request.num_computed_tokens
)
if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
num_new_tokens = self.scheduler_config.long_prefill_token_threshold
num_new_tokens = min(num_new_tokens, token_budget)
# Make sure the input position does not exceed the max model len.
# This is necessary when using spec decoding.
num_new_tokens = min(
num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
)
...
例如 prompt 长度 5,prefill 后采样出第一个输出 token t0:
num_tokens_with_spec = len(prompt + [t0]) = 6
num_computed_tokens = 5
num_new_tokens = 1
这一轮就把 t0 作为输入跑一次 decode forward。
调度成功后,scheduler 分配 KV slots,记录 block 增量和 token 数:
def schedule(self) -> SchedulerOutput:
...
# Schedule the request.
scheduled_running_reqs.append(request)
request_id = request.request_id
req_to_new_blocks[request_id] = new_blocks
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens
req_index += 1
...
5.2 再调度 WAITING 请求
WAITING 请求第一次进入运行队列时,会先查 prefix cache,然后计算本轮实际需要处理多少 token。
def schedule(self) -> SchedulerOutput:
...
# Get already-cached tokens.
if request.num_computed_tokens == 0:
# Get locally-cached tokens.
new_computed_blocks, num_new_local_computed_tokens = (
self.kv_cache_manager.get_computed_blocks(request)
)
...
# Total computed tokens (local + external).
num_computed_tokens = (
num_new_local_computed_tokens + num_external_computed_tokens
)
assert num_computed_tokens <= request.num_tokens
else:
# KVTransfer: WAITING reqs have num_computed_tokens > 0
# after async KV recvs are completed.
new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
num_new_local_computed_tokens = 0
num_computed_tokens = request.num_computed_tokens
...
基础无 prefix cache 命中时,num_computed_tokens = 0。随后本轮 token 数就是完整 prompt 长度,或 chunked prefill 的一段:
def schedule(self) -> SchedulerOutput:
...
# Number of tokens to be scheduled.
# We use `request.num_tokens` instead of
# `request.num_prompt_tokens` to consider the resumed
# requests, which have output tokens.
num_new_tokens = request.num_tokens - num_computed_tokens
threshold = self.scheduler_config.long_prefill_token_threshold
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold
...
num_new_tokens = min(num_new_tokens, token_budget)
assert num_new_tokens > 0
...
调度成功后,请求进入 running,并被标记为新请求或恢复请求。普通首次 text-only 请求会进入 scheduled_new_reqs。
def schedule(self) -> SchedulerOutput:
...
self.running.append(request)
if self.log_stats:
request.record_event(
EngineCoreEventType.SCHEDULED, scheduled_timestamp
)
if request.status == RequestStatus.WAITING:
scheduled_new_reqs.append(request)
elif request.status == RequestStatus.PREEMPTED:
scheduled_resumed_reqs.append(request)
else:
raise RuntimeError(f"Invalid request status: {request.status}")
if self.lora_config and request.lora_request:
scheduled_loras.add(request.lora_request.lora_int_id)
req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(
request_id
)
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_computed_tokens
...
注意这里 request.num_computed_tokens 先被设为 prefix cache 命中的长度,还没有加上本轮即将计算的 num_new_tokens。
5.3 KV block IDs 的来源
KV block 分配由 KVCacheManager.allocate_slots() 完成。SchedulerOutput 中的 block_ids 或 new_block_ids 都来自这里。
def allocate_slots(
self,
request: Request,
num_new_tokens: int,
num_new_computed_tokens: int = 0,
new_computed_blocks: KVCacheBlocks | None = None,
num_lookahead_tokens: int = 0,
num_external_computed_tokens: int = 0,
delay_cache_blocks: bool = False,
num_encoder_tokens: int = 0,
) -> KVCacheBlocks | None:
...
if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
# Cannot allocate new blocks
return None
...
new_blocks = self.coordinator.allocate_new_blocks(
request.request_id,
num_tokens_need_slot,
num_tokens_main_model,
num_encoder_tokens,
)
...
return self.create_kv_cache_blocks(new_blocks)
...
KVCacheBlocks 再被转换成实际传输给 worker 的 block id:
def get_block_ids(
self,
allow_none: bool = False,
) -> tuple[list[int], ...] | None:
...
if allow_none and all(len(group) == 0 for group in self.blocks):
return None
return tuple([blk.block_id for blk in group] for group in self.blocks)
...
对于新请求,worker 需要完整 block table,因此使用当前请求完整 blocks。对于已在 worker persistent batch 中的请求,通常只发送新分配 blocks,让 worker 追加即可。
5.4 构造 SchedulerOutput
调度完成后,scheduler 把 Request 列表转换成传输结构。
def schedule(self) -> SchedulerOutput:
...
new_reqs_data = [
NewRequestData.from_request(
req, req_to_new_blocks[req.request_id].get_block_ids()
)
for req in scheduled_new_reqs
]
with record_function_or_nullcontext("schedule: make_cached_request_data"):
cached_reqs_data = self._make_cached_request_data(
scheduled_running_reqs,
scheduled_resumed_reqs,
num_scheduled_tokens,
scheduled_spec_decode_tokens,
req_to_new_blocks,
)
# Record the request ids that were scheduled in this step.
self.prev_step_scheduled_req_ids.clear()
self.prev_step_scheduled_req_ids.update(num_scheduled_tokens.keys())
...
cached 请求的增量数据在 _make_cached_request_data() 中生成:
def _make_cached_request_data(
self,
running_reqs: list[Request],
resumed_reqs: list[Request],
num_scheduled_tokens: dict[str, int],
spec_decode_tokens: dict[str, list[int]],
req_to_new_blocks: dict[str, KVCacheBlocks],
) -> CachedRequestData:
...
for idx, req in enumerate(itertools.chain(running_reqs, resumed_reqs)):
req_id = req.request_id
req_ids.append(req_id)
...
scheduled_in_prev_step = req_id in self.prev_step_scheduled_req_ids
if idx >= num_running_reqs:
assert not scheduled_in_prev_step
resumed_req_ids.add(req_id)
if not scheduled_in_prev_step:
all_token_ids[req_id] = req.all_token_ids.copy()
new_block_ids.append(
req_to_new_blocks[req_id].get_block_ids(allow_none=True)
)
num_computed_tokens.append(req.num_computed_tokens)
num_output_tokens.append(
req.num_output_tokens + req.num_output_placeholders
)
return CachedRequestData(
req_ids=req_ids,
resumed_req_ids=resumed_req_ids,
new_token_ids=new_token_ids,
all_token_ids=all_token_ids,
new_block_ids=new_block_ids,
num_computed_tokens=num_computed_tokens,
num_output_tokens=num_output_tokens,
)
...
随后真正创建 SchedulerOutput:
def schedule(self) -> SchedulerOutput:
...
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=cached_reqs_data,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
scheduled_encoder_inputs=scheduled_encoder_inputs,
num_common_prefix_blocks=num_common_prefix_blocks,
preempted_req_ids={req.request_id for req in preempted_reqs},
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids=self.finished_req_ids,
free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
new_block_ids_to_zero=new_block_ids_to_zero,
)
...
这里有一个非常关键的时间点:SchedulerOutput 里的 num_computed_tokens 是本轮 forward 开始前的位置;但 schedule() 返回前,scheduler 自己会立刻把内部 Request.num_computed_tokens 往前推进。
def schedule(self) -> SchedulerOutput:
...
with record_function_or_nullcontext("schedule: update_after_schedule"):
self._update_after_schedule(scheduler_output)
return scheduler_output
...
def _update_after_schedule(self, scheduler_output: SchedulerOutput) -> None:
...
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
for req_id, num_scheduled_token in num_scheduled_tokens.items():
request = self.requests[req_id]
request.num_computed_tokens += num_scheduled_token
request.is_prefill_chunk = request.num_computed_tokens < (
request.num_tokens + request.num_output_placeholders
)
...
# Clear the finished request IDs.
# NOTE: We shouldn't do self.finished_req_ids.clear() here because
# it will also affect the scheduler output.
self.finished_req_ids = set()
...
这形成了一个重要不变量:
SchedulerOutput 中的 num_computed_tokens:
worker 本轮准备输入时使用的起始位置
Scheduler 内部 Request.num_computed_tokens:
schedule 返回前已经乐观推进到本轮 forward 之后的位置
基础 text-only 中不会有 speculative rejection,所以这个乐观推进通常不用回退。
6. worker 如何消费 SchedulerOutput
worker/model runner 侧会先把 SchedulerOutput 应用到自己的缓存状态,再据此构造本轮 GPU 输入。
6.1 清理已经完成的请求
finished_req_ids 是上一次或期间完成的请求集合。本轮 schedule 会把它带给 worker,worker 据此移除 cached state 和 persistent batch 中的请求。
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
# Remove finished requests from the cached states.
for req_id in scheduler_output.finished_req_ids:
self.requests.pop(req_id, None)
self.num_prompt_logprobs.pop(req_id, None)
self.late_interaction_runner.on_requests_finished(
scheduler_output.finished_req_ids
)
...
for req_id in scheduler_output.finished_req_ids:
self.input_batch.remove_request(req_id)
...
这解释了为什么 SchedulerOutput 可能出现 total_num_scheduled_tokens == 0 但仍有意义:它可能只是一个 cleanup-only output,用来通知 worker 删除已完成请求的缓存。
6.2 添加新请求
新请求会被转换成 worker 侧的 CachedRequestState。这份状态后续留在 worker 内部,不需要 scheduler 每轮重发完整 prompt。
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
# Add new requests to the cached states.
for new_req_data in scheduler_output.scheduled_new_reqs:
req_id = new_req_data.req_id
...
req_state = CachedRequestState(
req_id=req_id,
prompt_token_ids=new_req_data.prompt_token_ids,
prompt_embeds=new_req_data.prompt_embeds,
mm_features=new_req_data.mm_features,
sampling_params=sampling_params,
pooling_params=pooling_params,
generator=generator,
block_ids=new_req_data.block_ids,
num_computed_tokens=new_req_data.num_computed_tokens,
output_token_ids=[],
lora_request=new_req_data.lora_request,
)
self.requests[req_id] = req_state
...
reqs_to_add.append(req_state)
...
CachedRequestState 的基础字段如下:
@dataclass
class CachedRequestState:
req_id: str
prompt_token_ids: list[int] | None
mm_features: list[MultiModalFeatureSpec]
sampling_params: SamplingParams | None
generator: torch.Generator | None
block_ids: tuple[list[int], ...]
num_computed_tokens: int
output_token_ids: list[int]
...
6.3 更新已缓存请求
老请求不重发完整 prompt,而是更新起始位置、block table、必要的 token 对齐信息。
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
req_data = scheduler_output.scheduled_cached_reqs
scheduled_spec_tokens = scheduler_output.scheduled_spec_decode_tokens
...
for i, req_id in enumerate(req_data.req_ids):
req_state = self.requests[req_id]
num_computed_tokens = req_data.num_computed_tokens[i]
new_block_ids = req_data.new_block_ids[i]
resumed_from_preemption = req_id in req_data.resumed_req_ids
num_output_tokens = req_data.num_output_tokens[i]
req_index = self.input_batch.req_id_to_index.get(req_id)
...
# Update the cached states.
req_state.num_computed_tokens = num_computed_tokens
...
if not resumed_from_preemption:
if new_block_ids is not None:
# Append the new blocks to the existing block IDs.
for block_ids, new_ids in zip(req_state.block_ids, new_block_ids):
block_ids.extend(new_ids)
else:
assert req_index is None
assert new_block_ids is not None
# The request is resumed from preemption.
# Replace the existing block IDs with the new ones.
req_state.block_ids = new_block_ids
...
如果请求已经在 persistent batch 里,worker 同步 batch 中的 num_computed_tokens 和 block table:
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
# Update the persistent batch.
self.input_batch.num_computed_tokens_cpu[req_index] = num_computed_tokens
if new_block_ids is not None:
self.input_batch.block_table.append_row(new_block_ids, req_index)
...
最后,新请求或被移出 persistent batch 后又调度回来的请求,会被加入 InputBatch:
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
# Add the new or resumed requests to the persistent batch.
# The smaller empty indices are filled first.
for request in reqs_to_add:
self.input_batch.add_request(request)
self.input_batch.update_req_spec_token_ids(request, scheduled_spec_tokens)
# Condense the batched states if there are gaps left by removed requests
self.input_batch.condense()
# Allow attention backend to reorder the batch, potentially
self._may_reorder_batch(scheduler_output)
# Refresh batch metadata with any pending updates.
self.input_batch.refresh_metadata()
...
InputBatch.add_request() 会把 prompt token、已有 output token、起始 computed 位置和 block table 都放入 worker 的 batch 缓存:
def add_request(
self,
request: "CachedRequestState",
) -> int:
...
# Copy the prompt token ids and output token ids.
num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
request.prompt_token_ids, request.prompt_embeds
)
self.num_prompt_tokens[req_index] = num_prompt_tokens
start_idx = num_prompt_tokens
end_idx = start_idx + len(request.output_token_ids)
if request.prompt_token_ids is not None:
self.token_ids_cpu[req_index, :num_prompt_tokens] = request.prompt_token_ids
self.is_token_ids[req_index, :num_prompt_tokens] = True
else:
self.is_token_ids[req_index, :num_prompt_tokens] = False
...
self.token_ids_cpu[req_index, start_idx:end_idx] = request.output_token_ids
self.is_token_ids[req_index, start_idx:end_idx] = True
# Number of tokens without spec decode tokens.
self.num_tokens_no_spec[req_index] = request.num_tokens
self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
self.block_table.add_row(request.block_ids, req_index)
...
6.4 准备本轮 input_ids 和 positions
SchedulerOutput.num_scheduled_tokens 决定本轮每个请求取多少 token;CachedRequestState/InputBatch.num_computed_tokens 决定从哪个位置开始取。
def _prepare_inputs(
self,
scheduler_output: "SchedulerOutput",
num_scheduled_tokens: np.ndarray,
) -> tuple[
torch.Tensor,
SpecDecodeMetadata | None,
]:
...
total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
assert total_num_scheduled_tokens > 0
num_reqs = self.input_batch.num_reqs
assert num_reqs > 0
...
# Get request indices.
# E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2]
req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)
# cu_num_tokens: [2, 5, 3] -> [2, 7, 10]
# self.query_pos.np[:10]: [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
cu_num_tokens = self._get_cumsum_and_arange(
num_scheduled_tokens, self.query_pos.np
)
# Get positions.
positions_np = (
self.input_batch.num_computed_tokens_cpu[req_indices]
+ self.query_pos.np[: cu_num_tokens[-1]]
)
...
随后用这些 position 从 worker 侧 token cache 中取出真正送入模型的 token:
def _prepare_inputs(
self,
scheduler_output: "SchedulerOutput",
num_scheduled_tokens: np.ndarray,
) -> tuple[
torch.Tensor,
SpecDecodeMetadata | None,
]:
...
token_indices = (
positions_np + req_indices * self.input_batch.token_ids_cpu.shape[1]
)
token_indices_tensor = torch.from_numpy(token_indices)
# NOTE(woosuk): We use torch.index_select instead of np.take here
# because torch.index_select is much faster than np.take for large
# tensors.
torch.index_select(
self.input_batch.token_ids_cpu_tensor.flatten(),
0,
token_indices_tensor,
out=self.input_ids.cpu[:total_num_scheduled_tokens],
)
...
这一段把 SchedulerOutput 的几个字段连起来了:
num_scheduled_tokens[req] -> 本轮取几个 token
num_computed_tokens[req] -> 从哪个 token position 开始取
block_ids/new_block_ids -> 这些 token 的 KV 写到哪些 block
7. 模型输出如何回写
worker 采样后会构造 ModelRunnerOutput。基础 text-only 主要看 req_ids、req_id_to_index、sampled_token_ids。
@dataclass
class ModelRunnerOutput:
# [num_reqs]
req_ids: list[str]
# req_id -> index
req_id_to_index: dict[str, int]
# num_reqs x num_generated_tokens
# num_generated_tokens is the number of tokens
# generated in the current step. It can be different for
# each request due to speculative/jump decoding.
sampled_token_ids: list[list[int]] = field(default_factory=list)
...
采样出的 token 会先在 worker 侧缓存。这样下一轮 decode 准备 input_ids 时,不需要 scheduler 把刚采样的 token 再传回 worker。
def _bookkeeping_sync(
self,
scheduler_output: "SchedulerOutput",
sampler_output: SamplerOutput,
logits: torch.Tensor | None,
hidden_states: torch.Tensor,
num_scheduled_tokens: int,
spec_decode_metadata: SpecDecodeMetadata | None,
) -> tuple[
dict[str, int],
LogprobsLists | None,
list[list[int]],
dict[str, LogprobsTensors | None],
list[str],
dict[str, int],
list[int],
]:
...
# Cache the sampled tokens in the model runner, so that the scheduler
# doesn't need to send them back.
# NOTE(woosuk): As an exception, when using PP, the scheduler sends
# the sampled tokens back, because there's no direct communication
# between the first-stage worker and the last-stage worker.
req_ids = self.input_batch.req_ids
for req_idx in range(num_sampled_tokens):
...
self.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = sampled_ids
self.input_batch.is_token_ids[req_idx, start_idx:end_idx] = True
self.input_batch.num_tokens_no_spec[req_idx] = end_idx
req_id = req_ids[req_idx]
req_state = self.requests[req_id]
req_state.output_token_ids.extend(sampled_ids)
...
然后 scheduler 侧用同一个 SchedulerOutput 作为上下文来解释 ModelRunnerOutput:
def update_from_output(
self,
scheduler_output: SchedulerOutput,
model_runner_output: ModelRunnerOutput,
) -> dict[int, EngineCoreOutputs]:
...
sampled_token_ids = model_runner_output.sampled_token_ids
...
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
...
for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
assert num_tokens_scheduled > 0
...
request = self.requests.get(req_id)
if request is None or request.is_finished():
...
continue
req_index = model_runner_output.req_id_to_index[req_id]
generated_token_ids = (
sampled_token_ids[req_index] if sampled_token_ids else []
)
...
如果这一轮生成了新 token,scheduler 将它追加到 Request,并检查是否停止:
def update_from_output(
self,
scheduler_output: SchedulerOutput,
model_runner_output: ModelRunnerOutput,
) -> dict[int, EngineCoreOutputs]:
...
# Check for stop and update request status.
if new_token_ids:
new_token_ids, stopped = self._update_request_with_output(
request, new_token_ids
)
...
def _update_request_with_output(
self, request: Request, new_token_ids: list[int]
) -> tuple[list[int], bool]:
# Append generated tokens and check for stop. Note that if
# a request is still being prefilled, we expect the model runner
# to return empty token ids for the request.
stopped = False
for num_new, output_token_id in enumerate(new_token_ids, 1):
request.append_output_token_ids(output_token_id)
# Check for stop and update request state.
# This must be called before we make the EngineCoreOutput.
stopped = check_stop(request, self.max_model_len)
if stopped:
del new_token_ids[num_new:] # Trim new tokens if needed.
break
return new_token_ids, stopped
如果请求结束,scheduler 会记录 finished_req_ids,释放 scheduler 侧 KV blocks,并在下一次 schedule() 中通知 worker 清理 worker 侧状态。
def _free_request(
self, request: Request, delay_free_blocks: bool = False
) -> dict[str, Any] | None:
assert request.is_finished()
connector_delay_free_blocks, kv_xfer_params = self._connector_finished(request)
self.encoder_cache_manager.free(request)
request_id = request.request_id
self.finished_req_ids.add(request_id)
if self.finished_req_ids_dict is not None:
self.finished_req_ids_dict[request.client_index].add(request_id)
delay_free_blocks |= connector_delay_free_blocks
if not delay_free_blocks:
self._free_blocks(request)
return kv_xfer_params
...
8. 单请求 text-only 时间线
假设一个请求 r1,prompt 长度 5,无 prefix cache 命中,每轮 decode 1 个 token。
初始:
Request._all_token_ids = [p0 p1 p2 p3 p4]
Request._output_token_ids = []
Request.num_computed_tokens = 0
status = WAITING
Step 0: prefill
schedule:
num_computed_tokens snapshot = 0
num_scheduled_tokens[r1] = 5
scheduled_new_reqs = [NewRequestData(r1, prompt, block_ids, 0)]
scheduler 内部 num_computed_tokens += 5 -> 5
worker:
add CachedRequestState(r1)
input positions = [0,1,2,3,4]
input_ids = [p0,p1,p2,p3,p4]
forward 后 sample t0
worker token cache 追加 t0
update_from_output:
Request.append_output_token_ids(t0)
_all_token_ids = [p0 p1 p2 p3 p4 t0]
_output_token_ids = [t0]
Step 1: decode t0
schedule:
num_tokens = 6
num_computed_tokens = 5
num_scheduled_tokens[r1] = 1
scheduled_cached_reqs.req_ids = [r1]
scheduled_cached_reqs.num_computed_tokens = [5]
scheduler 内部 num_computed_tokens += 1 -> 6
worker:
input position = [5]
input_ids = [t0]
forward 后 sample t1
worker token cache 追加 t1
update_from_output:
Request.append_output_token_ids(t1)
Step 2:
同理,输入 t1,采样 t2。
这个时间线说明了 num_computed_tokens 的语义:它指向“下一个要作为输入计算的 token 位置”。采样 token 出现后,会先进入 token history;下一轮被 schedule 出来并真正计算 KV。
9. 字段来源、用途、更新时间速查
| 字段 | 来源 | worker/model runner 用途 | 更新时间 |
|---|---|---|---|
scheduled_new_reqs | schedule() 中从 WAITING 请求转换而来 | 创建 worker 侧 CachedRequestState,初始化 prompt、sampling params、block table | 每次 schedule 新建;请求只在首次调度时出现 |
scheduled_cached_reqs.req_ids | 本轮被调度的 RUNNING/恢复请求 | 定位 worker 已缓存请求 | 每次 schedule 新建 |
scheduled_cached_reqs.new_block_ids | KVCacheManager.allocate_slots() / get_blocks() | 追加或替换 worker block table | 调度分配 KV slots 时产生 |
scheduled_cached_reqs.num_computed_tokens | scheduler 侧 Request.num_computed_tokens 的本轮起始快照 | 设置 input 起始 position | 构造 SchedulerOutput 前采样;随后 scheduler 内部会推进 |
scheduled_cached_reqs.num_output_tokens | scheduler 侧 output token 数 | 对齐 worker 侧 output token cache | 构造 cached request data 时采样 |
num_scheduled_tokens | scheduler 根据 num_tokens - num_computed_tokens 和 token budget 计算 | 决定每个请求本轮 forward token 数 | 本轮 schedule 内生成 |
total_num_scheduled_tokens | sum(num_scheduled_tokens.values()) | 判断是否执行 forward,准备总输入 buffer | 本轮 schedule 内生成 |
finished_req_ids | _free_request() 或外部 abort/finish 流程写入 scheduler | worker 删除 cached state 和 persistent batch 条目 | 在下一次 schedule 中随 output 发出,随后 scheduler 侧重置为空 set |
new_block_ids_to_zero | KV block pool 本轮新分配 block | worker 在使用前清零对应 GPU KV memory | 仅在需要 KV cache zeroing 时非空 |
10. 机制总结
SchedulerOutput 解决的是 scheduler 与 model runner 的状态分工问题:
- scheduler 负责队列、预算、KV block 分配、请求完成与停止条件。
- worker 负责把请求状态缓存成高效的 GPU 输入批,包括 token cache、block table、采样参数和 persistent batch。
SchedulerOutput是每轮同步二者的最小计划:新请求给全量,老请求给增量,再加上每个请求本轮该跑多少 token。
最容易混淆的是 num_computed_tokens 与 generated token 的关系。generated token 在本轮采样产生,但它的 KV 还没有被计算;下一轮 scheduler 会看到 len(prompt + output) 变长,于是把这个新 token 调度为输入。这个“采样先进入历史、下一轮再计算”的节奏,正是 text-only 自回归 decode 在 SchedulerOutput 中的核心数据流。
- 感谢你赐予我前进的力量
