vLLM框架代码走读04(KV Cache Slot Mapping 机制)
本文梳理了 vLLM V1 中 KV Cache 的 slot mapping 全流程:调度器如何拿到物理 KV block,worker 如何把 block table 转成 slot mapping,attention backend 又如何用它把当前 token 的 K/V 写入 paged KV cache,并在后续 attention 中读回历史上下文。
1. 三个核心坐标
KV cache 的寻址可以拆成三层坐标:
逻辑 token 位置 position
|
| position // block_size
v
请求内第几个 KV block
|
| block_table[request_row, block_index]
v
物理 block_id
|
| block_id * block_size + position % block_size
v
物理 slot_id
slot_mapping 就是“本轮 batch 中每个输入 token 要写入哪个物理 KV slot”的一维张量。它的典型公式是:
slot_id = physical_block_id * block_size + block_offset
其中 block_table 是按请求维护的二维表,记录“请求的第 N 个逻辑 block 对应哪个物理 block_id”;slot_mapping 是按本轮扁平化 token 序列维护的一维表,记录“第 i 个 input token 写到哪个物理 slot_id”。
2. 全链路总览
Request
| prompt/output token ids
| block_hashes
v
Scheduler
| get_computed_blocks()
| allocate_slots()
v
KVCacheManager / KVCacheCoordinator / SingleTypeKVCacheManager
| req_id -> [KVCacheBlock(block_id=...)]
v
SchedulerOutput
| NewRequestData.block_ids
| CachedRequestData.new_block_ids
v
GpuModelRunner / InputBatch
| MultiGroupBlockTable.add_row()/append_row()
| commit_block_table()
v
BlockTable on GPU
| compute_slot_mapping(query_start_loc, positions)
v
slot_mapping
| CommonAttentionMetadata.slot_mapping
| ForwardContext.slot_mapping[layer_name]
v
Attention layer
| unified_kv_cache_update()
| reshape_and_cache(_flash)
v
Paged KV cache memory
一次模型执行中,block_table 和 slot_mapping 分工明确:
block_table给 attention kernel 读历史 KV,用于从 paged KV cache 中按 sequence 找页。slot_mapping给 KV cache update kernel 写当前这批 token 的 K/V,用于把新算出的 key/value 放进正确 page offset。- 在 V1 中,多个 KV cache group 可以有不同的 block table 和 slot mapping,最后按 layer name 分发给各层。
3. 调度器先分配 block,不直接生成 slot
slot mapping 不是调度器直接下发的。调度器只负责决定每个请求需要哪些 KV cache block,并把这些 block_id 带到 worker。
SchedulerOutput 中的新请求和老请求都只携带 block ids:
@dataclass
class NewRequestData:
...
block_ids: tuple[list[int], ...]
num_computed_tokens: int
...
@dataclass
class CachedRequestData:
...
new_block_ids: list[tuple[list[int], ...] | None]
num_computed_tokens: list[int]
...
这里的 tuple[list[int], ...] 外层按 KV cache group 划分,内层是该 group 下请求持有的物理 block id 列表。
等待队列中新请求第一次进入调度时,会先查 prefix cache,再为“cache miss 后需要计算的 token”分配新 block:
def schedule(self) -> SchedulerOutput:
...
if request.num_computed_tokens == 0:
# Get locally-cached tokens.
new_computed_blocks, num_new_local_computed_tokens = (
self.kv_cache_manager.get_computed_blocks(request)
)
...
num_computed_tokens = (
num_new_local_computed_tokens + num_external_computed_tokens
)
assert num_computed_tokens <= request.num_tokens
...
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_new_computed_tokens=num_new_local_computed_tokens,
new_computed_blocks=new_computed_blocks,
num_lookahead_tokens=effective_lookahead_tokens,
num_external_computed_tokens=num_external_computed_tokens,
delay_cache_blocks=load_kv_async,
num_encoder_tokens=num_encoder_tokens,
)
...
req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(
request_id
)
num_scheduled_tokens[request_id] = num_new_tokens
...
已经在 running 队列中的请求继续 decode 或 chunked prefill 时,也只追加新 block:
def schedule(self) -> SchedulerOutput:
...
while True:
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_lookahead_tokens=self.num_lookahead_tokens,
)
if new_blocks is not None:
# The request can be scheduled.
break
...
req_to_new_blocks[request_id] = new_blocks
num_scheduled_tokens[request_id] = num_new_tokens
...
调度结束时,SchedulerOutput 被构造出来,worker 只会看到 block id,而不会看到 slot id:
def schedule(self) -> SchedulerOutput:
...
new_reqs_data = [
NewRequestData.from_request(
req, req_to_new_blocks[req.request_id].get_block_ids()
)
for req in scheduled_new_reqs
]
...
cached_reqs_data = self._make_cached_request_data(
scheduled_running_reqs,
scheduled_resumed_reqs,
num_scheduled_tokens,
scheduled_spec_decode_tokens,
req_to_new_blocks,
)
...
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=cached_reqs_data,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
...
)
...
4. KVCacheManager:prefix hit、分配、缓存提交
KVCacheManager 是调度器与底层 block pool 之间的门面。它隐藏了 full attention、sliding window、chunked local attention、mamba、cross attention 等不同 KV cache type 的差异。
4.1 prefix cache 命中只命中完整 block
prefix cache 查询发生在请求还没有本地 computed token 时。命中长度最多到 request.num_tokens - 1,因为即使命中整个 prompt,最后一个 token 也要重算以产生 logits。
def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
...
if not self.enable_caching or request.skip_reading_prefix_cache:
return self.empty_kv_cache_blocks, 0
# NOTE: When all tokens hit the cache, we must recompute the last token
# to obtain logits. Thus, set max_cache_hit_length to prompt_length - 1.
max_cache_hit_length = request.num_tokens - 1
computed_blocks, num_new_computed_tokens = (
self.coordinator.find_longest_cache_hit(
request.block_hashes, max_cache_hit_length
)
)
...
return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens
...
prefix cache 的索引来自 Request.block_hashes。请求创建和追加输出 token 时都会更新 full block 的 hash:
class Request:
...
def append_output_token_ids(
self,
token_ids: int | list[int],
) -> None:
if isinstance(token_ids, int):
self._output_token_ids.append(token_ids)
self._all_token_ids.append(token_ids)
else:
self._output_token_ids.extend(token_ids)
self._all_token_ids.extend(token_ids)
self.update_block_hashes()
def update_block_hashes(self) -> None:
"""Compute block hashes for any new full blocks and append them."""
if self._block_hasher is not None:
self.block_hashes.extend(self._block_hasher(self))
...
4.2 allocate_slots 的三个动作
allocate_slots() 做三件事:
- 计算本轮之后需要有 slot 的 token 数,包括新 token 和 speculative lookahead token。
- 对 sliding window / local attention 等会跳过左侧 token 的场景,释放不再需要的旧 block。
- 分配新 block,并把已经完整的 block 写入 prefix cache 索引。
def allocate_slots(
self,
request: Request,
num_new_tokens: int,
num_new_computed_tokens: int = 0,
new_computed_blocks: KVCacheBlocks | None = None,
num_lookahead_tokens: int = 0,
num_external_computed_tokens: int = 0,
delay_cache_blocks: bool = False,
num_encoder_tokens: int = 0,
) -> KVCacheBlocks | None:
...
num_local_computed_tokens = (
request.num_computed_tokens + num_new_computed_tokens
)
total_computed_tokens = min(
num_local_computed_tokens + num_external_computed_tokens,
self.max_model_len,
)
num_tokens_main_model = total_computed_tokens + num_new_tokens
num_tokens_need_slot = min(
num_tokens_main_model + num_lookahead_tokens,
self.max_model_len,
)
self.coordinator.remove_skipped_blocks(
request.request_id, total_computed_tokens
)
num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
request_id=request.request_id,
num_tokens=num_tokens_need_slot,
new_computed_blocks=new_computed_block_list,
num_encoder_tokens=num_encoder_tokens,
total_computed_tokens=num_local_computed_tokens
+ num_external_computed_tokens,
num_tokens_main_model=num_tokens_main_model,
)
if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
return None
if (
new_computed_block_list is not self.empty_kv_cache_blocks.blocks
or num_external_computed_tokens > 0
):
self.coordinator.allocate_new_computed_blocks(
request_id=request.request_id,
new_computed_blocks=new_computed_block_list,
num_local_computed_tokens=num_local_computed_tokens,
num_external_computed_tokens=num_external_computed_tokens,
)
new_blocks = self.coordinator.allocate_new_blocks(
request.request_id,
num_tokens_need_slot,
num_tokens_main_model,
num_encoder_tokens,
)
...
num_tokens_to_cache = min(
total_computed_tokens + num_new_tokens,
request.num_tokens,
)
self.coordinator.cache_blocks(request, num_tokens_to_cache)
return self.create_kv_cache_blocks(new_blocks)
...
这段代码里的 num_tokens_need_slot 是 slot mapping 后续能成立的前提:只要某个 position 会被本轮输入、后续 draft、或外部 KV load 使用,它对应的 block 就必须已经存在于 req_to_blocks 和 worker 的 block table 中。
4.3 SingleTypeKVCacheManager 维护 req_id 到 blocks 的映射
真正的“请求持有哪些物理 block”由 single-type manager 维护:
class SingleTypeKVCacheManager(ABC):
...
def allocate_new_blocks(
self, request_id: str, num_tokens: int, num_tokens_main_model: int
) -> list[KVCacheBlock]:
...
req_blocks = self.req_to_blocks[request_id]
num_required_blocks = cdiv(num_tokens, self.block_size)
num_new_blocks = num_required_blocks - len(req_blocks)
if num_new_blocks <= 0:
return []
else:
new_blocks = self.block_pool.get_new_blocks(num_new_blocks)
req_blocks.extend(new_blocks)
if type(self.kv_cache_spec) is FullAttentionSpec:
self.new_block_ids.extend(b.block_id for b in new_blocks)
return new_blocks
...
prefix cache hit 的 block 会被 touch() 增加引用计数,防止被当作可驱逐 block 重新分配:
def allocate_new_computed_blocks(
self,
request_id: str,
new_computed_blocks: Sequence[KVCacheBlock],
num_local_computed_tokens: int,
num_external_computed_tokens: int,
) -> None:
...
if self.enable_caching:
self.block_pool.touch(new_computed_blocks)
...
req_blocks.extend([self._null_block] * num_skipped_blocks)
req_blocks.extend(new_computed_blocks)
self.num_cached_block[request_id] = len(req_blocks)
...
被完整计算过的 block 会被写入 prefix cache 的 hash 表。注意这里缓存的是 block 元数据映射,不是重新拷贝 KV tensor:
def cache_full_blocks(
self,
request: Request,
blocks: list[KVCacheBlock],
num_cached_blocks: int,
num_full_blocks: int,
block_size: int,
kv_cache_group_id: int,
) -> None:
...
new_full_blocks = blocks[num_cached_blocks:num_full_blocks]
...
for i, blk in enumerate(new_full_blocks):
if blk.is_null:
continue
assert blk.block_hash is None
block_hash = new_block_hashes[i]
block_hash_with_group_id = make_block_hash_with_group_id(
block_hash, kv_cache_group_id
)
blk.block_hash = block_hash_with_group_id
self.cached_block_hash_to_block.insert(block_hash_with_group_id, blk)
...
KVCacheBlock.block_id 是后续 worker 计算 slot id 的基础,而 block_hash 只服务 prefix cache 查找与复用。
5. Worker 接收 block_ids,维护 block table
worker 侧有一个持久的 InputBatch。每个活跃请求占一行,行号是 worker 内部的 req_index。调度器下发的 block ids 被写入这一行。
新请求进入 batch 时直接 add_row():
def add_request(
self,
request: "CachedRequestState",
) -> int:
...
self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
self.block_table.add_row(request.block_ids, req_index)
...
return req_index
...
已存在请求收到新增 block 时 append_row():
def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
...
if not resumed_from_preemption:
if new_block_ids is not None:
# Append the new blocks to the existing block IDs.
for block_ids, new_ids in zip(req_state.block_ids, new_block_ids):
block_ids.extend(new_ids)
...
self.input_batch.num_computed_tokens_cpu[req_index] = num_computed_tokens
if new_block_ids is not None:
self.input_batch.block_table.append_row(new_block_ids, req_index)
...
BlockTable 的 CPU buffer 是 staging 区,真正模型执行前会拷到 GPU:
class BlockTable:
...
def append_row(
self,
block_ids: list[int],
row_idx: int,
) -> None:
if not block_ids:
return
...
num_blocks = len(block_ids)
start = self.num_blocks_per_row[row_idx]
self.num_blocks_per_row[row_idx] += num_blocks
self.block_table.np[row_idx, start : start + num_blocks] = block_ids
def add_row(self, block_ids: list[int], row_idx: int) -> None:
self.num_blocks_per_row[row_idx] = 0
self.append_row(block_ids, row_idx)
...
def commit_block_table(self, num_reqs: int) -> None:
self.block_table.copy_to_gpu(num_reqs)
...
多个 KV cache group 时,MultiGroupBlockTable 对每个 group 各维护一张 BlockTable:
class MultiGroupBlockTable:
"""The BlockTables for each KV cache group."""
...
def append_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None:
for i, block_table in enumerate(self.block_tables):
block_table.append_row(block_ids[i], row_idx)
def add_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None:
for i, block_table in enumerate(self.block_tables):
block_table.add_row(block_ids[i], row_idx)
...
def compute_slot_mapping(
self,
num_reqs: int,
query_start_loc: torch.Tensor,
positions: torch.Tensor,
) -> None:
for block_table in self.block_tables:
block_table.compute_slot_mapping(num_reqs, query_start_loc, positions)
...
6. 从 positions 到 slot_mapping
GpuModelRunner._prepare_inputs() 会把本轮调度结果展开成扁平 token batch。关键中间量有两个:
query_start_loc:每个请求在扁平 token batch 中的起止位置。positions:每个扁平 token 在原请求序列中的逻辑 position。
例如本轮三个请求分别调度 [2, 5, 3] 个 token:
query_start_loc = [0, 2, 7, 10]
query_pos = [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
positions = num_computed_tokens[req] + query_pos
对应代码:
def _prepare_inputs(
self,
scheduler_output: "SchedulerOutput",
num_scheduled_tokens: np.ndarray,
) -> tuple[
torch.Tensor,
SpecDecodeMetadata | None,
]:
...
self.input_batch.block_table.commit_block_table(num_reqs)
# Get request indices.
# E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2]
req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)
# cu_num_tokens: [2, 5, 3] -> [2, 7, 10]
# self.query_pos.np[:10]: [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
cu_num_tokens = self._get_cumsum_and_arange(
num_scheduled_tokens, self.query_pos.np
)
# Get positions.
positions_np = (
self.input_batch.num_computed_tokens_cpu[req_indices]
+ self.query_pos.np[: cu_num_tokens[-1]]
)
...
self.query_start_loc.np[0] = 0
self.query_start_loc.np[1 : num_reqs + 1] = cu_num_tokens
self.query_start_loc.copy_to_gpu()
...
self.positions[:total_num_scheduled_tokens] = (
self.num_computed_tokens[req_indices_gpu].to(torch.int64)
+ self.query_pos.gpu[:total_num_scheduled_tokens]
)
...
self.input_batch.block_table.compute_slot_mapping(
num_reqs,
self.query_start_loc.gpu[: num_reqs + 1],
self.positions[:total_num_scheduled_tokens],
)
...
slot mapping 的 Triton kernel 逐请求处理。对于请求内每个 token,先用 position // block_size 找逻辑 block index,再从 block table 取物理 block id,最后组合成物理 slot id:
@triton.jit
def _compute_slot_mapping_kernel(
num_tokens,
max_num_tokens,
query_start_loc_ptr, # [num_reqs + 1], int32
positions_ptr, # [num_tokens], int64
block_table_ptr, # [max_num_reqs, max_num_blocks_per_req], int32 (flat)
block_table_stride, # max_num_blocks_per_req
block_size,
slot_mapping_ptr, # [max_num_tokens], int64
TOTAL_CP_WORLD_SIZE: tl.constexpr,
TOTAL_CP_RANK: tl.constexpr,
CP_KV_CACHE_INTERLEAVE_SIZE: tl.constexpr,
PAD_ID: tl.constexpr,
BLOCK_SIZE: tl.constexpr,
):
...
start_idx = tl.load(query_start_loc_ptr + req_idx).to(tl.int64)
end_idx = tl.load(query_start_loc_ptr + req_idx + 1).to(tl.int64)
virtual_block_size = block_size * TOTAL_CP_WORLD_SIZE
row_offset = req_idx * block_table_stride
for i in range(start_idx, end_idx, BLOCK_SIZE):
offsets = i + tl.arange(0, BLOCK_SIZE)
mask = offsets < end_idx
pos = tl.load(positions_ptr + offsets, mask=mask, other=0)
block_indices = pos // virtual_block_size
block_numbers = tl.load(block_table_ptr + row_offset + block_indices).to(
tl.int64
)
...
slot_ids = block_numbers * block_size + local_block_offsets
slot_ids = tl.where(is_local, slot_ids, PAD_ID)
tl.store(slot_mapping_ptr + offsets, slot_ids, mask=mask)
...
在没有 context parallelism 时,这段可以简化理解为:
block_index = position // block_size
block_offset = position % block_size
block_id = block_table[req_idx, block_index]
slot_id = block_id * block_size + block_offset
context parallelism 存在时,一个全局 sequence 的 token 按 CP_KV_CACHE_INTERLEAVE_SIZE 在多个 rank 间交错分片。非本 rank 的 token 会被写成 PAD_SLOT_ID,后续 cache update kernel 看到负 slot 会跳过。
新版 GPU worker 路径中同样有 fused multi-group 版本。它一次生成 [num_kv_cache_groups, num_tokens] 的 slot mappings:
def compute_slot_mappings(
self,
idx_mapping: torch.Tensor,
query_start_loc: torch.Tensor,
positions: torch.Tensor,
num_tokens_padded: int,
) -> torch.Tensor:
...
_compute_slot_mappings_kernel[(num_groups, num_reqs + 1)](
self.max_num_batched_tokens,
idx_mapping,
query_start_loc,
positions,
self.block_table_ptrs,
self.block_table_strides,
self.block_sizes_tensor,
self.slot_mappings,
self.slot_mappings.stride(0),
self.cp_rank,
CP_SIZE=self.cp_size,
CP_INTERLEAVE=self.cp_interleave,
PAD_ID=PAD_SLOT_ID,
TRITON_BLOCK_SIZE=1024,
)
return self.slot_mappings[:, :num_tokens_padded]
...
7. slot_mapping 如何进入 attention
slot mapping 会走两条路:
- 放进
CommonAttentionMetadata,给 attention backend 构造读 KV cache 的 metadata。 - 放进
ForwardContext.slot_mapping,给每一层的 KV cache update op 写 KV cache。
CommonAttentionMetadata 中直接持有 block_table_tensor 和 slot_mapping:
@dataclass
class CommonAttentionMetadata:
...
block_table_tensor: torch.Tensor
slot_mapping: torch.Tensor
...
_build_attention_metadata() 会按 KV cache group 取对应的 block table 和 slot mapping,并在每个 group 内把 metadata 分发给 layer:
def _build_attention_metadata(
self,
num_tokens: int,
num_reqs: int,
max_query_len: int,
...
slot_mappings: dict[int, torch.Tensor] | None = None,
) -> tuple[PerLayerAttnMetadata, CommonAttentionMetadata | None]:
...
assert slot_mappings is not None
block_table_gid_0 = _get_block_table(0)
slot_mapping_gid_0 = slot_mappings[0]
...
cm_base = CommonAttentionMetadata(
query_start_loc=self.query_start_loc.gpu[: num_reqs_padded + 1],
query_start_loc_cpu=self.query_start_loc.cpu[: num_reqs_padded + 1],
seq_lens=self.seq_lens[:num_reqs_padded],
...
block_table_tensor=block_table_gid_0,
slot_mapping=slot_mapping_gid_0,
causal=True,
is_prefilling=is_prefilling,
)
...
for kv_cache_gid, kv_cache_group in enumerate(kv_cache_groups):
cm = copy(cm_base) # shallow copy
...
if kv_cache_gid > 0:
cm.block_table_tensor = _get_block_table(kv_cache_gid)
cm.slot_mapping = slot_mappings[kv_cache_gid]
...
同时,worker 会构造 layer name 到 slot mapping 的映射,放入 forward context:
def build_slot_mappings_by_layer(
slot_mappings: torch.Tensor, kv_cache_config: KVCacheConfig
) -> dict[str, torch.Tensor]:
slot_mappings_by_layer: dict[str, torch.Tensor] = {}
kv_cache_groups = kv_cache_config.kv_cache_groups
for slot_mapping, kv_cache_group in zip(slot_mappings, kv_cache_groups):
for layer_name in kv_cache_group.layer_names:
slot_mappings_by_layer[layer_name] = slot_mapping
return slot_mappings_by_layer
...
模型 forward 前,slot_mapping=slot_mappings_by_layer 被挂到 forward context:
with set_forward_context(
attn_metadata,
self.vllm_config,
num_tokens=input_batch.num_tokens_after_padding,
cudagraph_runtime_mode=batch_desc.cg_mode,
num_tokens_across_dp=num_tokens_across_dp,
batch_descriptor=batch_descriptor,
slot_mapping=slot_mappings_by_layer,
skip_compiled=skip_compiled,
):
self.kv_connector.pre_forward(scheduler_output)
model_output = self.model(**model_inputs)
...
8. Attention 层用 slot_mapping 写当前 K/V
以 Llama attention 为例,模型层先算 QKV,再调用通用 Attention:
def forward(
self,
positions: torch.Tensor,
hidden_states: torch.Tensor,
) -> torch.Tensor:
qkv, _ = self.qkv_proj(hidden_states)
q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
q, k = self.rotary_emb(positions, q, k)
attn_output = self.attn(q, k, v)
output, _ = self.o_proj(attn_output)
return output
...
Attention.forward() 中,若 backend 的 forward 不自带 KV cache update,则先显式调用 unified_kv_cache_update(),再调用 attention:
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
...
) -> torch.Tensor:
...
if (
not self.attn_backend.forward_includes_kv_cache_update
and self.kv_sharing_target_layer_name is None
and key is not None
and value is not None
):
kv_cache_dummy_dep = unified_kv_cache_update(
key, value, self.layer_name
)
unified_attention_with_output(
query,
key,
value,
output,
self.layer_name,
kv_cache_dummy_dep=kv_cache_dummy_dep,
)
...
unified_kv_cache_update() 从 forward context 中取当前 layer 的 slot mapping:
def unified_kv_cache_update(
key: torch.Tensor,
value: torch.Tensor,
layer_name: str,
) -> torch.Tensor:
...
_, attn_layer, kv_cache, layer_slot_mapping = get_attention_context(layer_name)
if layer_slot_mapping is not None:
assert hasattr(attn_layer.impl, "do_kv_cache_update"), (
f"{attn_layer.impl.__class__.__name__} does not support kv cache update"
)
attn_layer.impl.do_kv_cache_update(
attn_layer,
key,
value,
kv_cache,
layer_slot_mapping,
)
return torch.empty(0, device=kv_cache.device, dtype=kv_cache.dtype)
...
FlashAttention backend 的 update 逻辑将 kv_cache 拆成 key cache 和 value cache,然后调用 reshape_and_cache_flash():
def do_kv_cache_update(
self,
layer: torch.nn.Module,
key: torch.Tensor,
value: torch.Tensor,
kv_cache: torch.Tensor,
slot_mapping: torch.Tensor,
) -> None:
if self.attn_type in (AttentionType.ENCODER_ONLY, AttentionType.ENCODER):
return
key_cache, value_cache = kv_cache.unbind(0)
# Reshape the input keys and values and store them in the cache.
# Skip this if sharing KV cache with an earlier attention layer.
reshape_and_cache_flash(
key,
value,
key_cache,
value_cache,
slot_mapping,
self.kv_cache_dtype,
layer._k_scale,
layer._v_scale,
)
...
底层 CUDA kernel 对每个 token 读取 slot id,并拆成 block index 与 block 内 offset:
template <typename scalar_t, typename cache_t, Fp8KVCacheDataType kv_dt>
__global__ void reshape_and_cache_flash_kernel(
const scalar_t* __restrict__ key, // [num_tokens, num_heads, head_size]
const scalar_t* __restrict__ value, // [num_tokens, num_heads, head_size]
cache_t* __restrict__ key_cache, // NHD or HND, shape see comments below
cache_t* __restrict__ value_cache, // same above
const int64_t* __restrict__ slot_mapping, // [num_tokens]
...
) {
const int64_t token_idx = blockIdx.x;
const int64_t slot_idx = slot_mapping[token_idx];
// NOTE: slot_idx can be -1 if the token is padded
if (slot_idx < 0) {
return;
}
const int64_t block_idx = slot_idx / block_size;
const int64_t block_offset = slot_idx % block_size;
...
cache_t* __restrict__ key_dst =
key_cache + block_idx * block_stride + block_offset * page_stride;
cache_t* __restrict__ value_dst =
value_cache + block_idx * block_stride + block_offset * page_stride;
...
}
...
这里的 slot_idx < 0 与前面 slot mapping kernel 填充的 PAD_SLOT_ID 对应。padding token、非本 context-parallel rank 的 token,都不会写入 KV cache。
9. block_table 如何参与读历史 KV
写当前 K/V 依赖 slot_mapping,读历史 K/V 依赖 block_table。FlashAttention metadata builder 把两者一起放入 backend metadata:
def build(
self,
common_prefix_len: int,
common_attn_metadata: CommonAttentionMetadata,
fast_build: bool = False,
) -> FlashAttentionMetadata:
...
block_table_tensor = common_attn_metadata.block_table_tensor
slot_mapping = common_attn_metadata.slot_mapping
...
attn_metadata = FlashAttentionMetadata(
num_actual_tokens=num_actual_tokens,
max_query_len=max_query_len,
query_start_loc=query_start_loc,
max_seq_len=max_seq_len,
seq_lens=seq_lens,
block_table=block_table_tensor,
slot_mapping=slot_mapping,
...
)
return attn_metadata
...
后续 attention forward 会把 attn_metadata.block_table 传给 paged attention / flash attention kernel。这样每个请求的历史 KV 不需要连续存储,只要 block table 能把逻辑 block 序号翻译成物理 page 即可。
因此完整读写模型是:
当前 token 的 K/V 写入:
token_idx -> slot_mapping[token_idx]
-> block_idx + block_offset
-> key_cache/value_cache[block_idx, block_offset, ...]
历史上下文 K/V 读取:
req_idx + logical kv position
-> block_table[req_idx, logical_block_idx]
-> physical block/page
-> key_cache/value_cache[block, offset, ...]
10. prefix cache、滑窗与 null block
Full attention 的 prefix hit 是从左到右连续匹配 block hash:
class FullAttentionManager(SingleTypeKVCacheManager):
@classmethod
def find_longest_cache_hit(
cls,
block_hashes: BlockHashList,
max_length: int,
kv_cache_group_ids: list[int],
block_pool: BlockPool,
kv_cache_spec: KVCacheSpec,
use_eagle: bool,
alignment_tokens: int,
...
) -> tuple[list[KVCacheBlock], ...]:
...
max_num_blocks = max_length // block_size
for block_hash in itertools.islice(block_hashes, max_num_blocks):
if cached_block := block_pool.get_cached_block(
block_hash, kv_cache_group_ids
):
for computed, cached in zip(computed_blocks, cached_block):
computed.append(cached)
else:
break
if use_eagle and computed_blocks[0]:
for computed in computed_blocks:
computed.pop()
...
return computed_blocks
...
sliding window 不会永远保留最左侧 block。它会根据当前 computed token 数计算被窗口跳过的 token,并把对应 block 释放、替换成 null_block:
def remove_skipped_blocks(
self, request_id: str, total_computed_tokens: int
) -> None:
...
num_skipped_tokens = self.get_num_skipped_tokens(total_computed_tokens)
if num_skipped_tokens <= 0:
return
blocks = self.req_to_blocks[request_id]
num_skipped_blocks = num_skipped_tokens // self.block_size
num_skipped_blocks = min(num_skipped_blocks, len(blocks))
removed_blocks: list[KVCacheBlock] = []
for i in range(num_skipped_blocks - 1, -1, -1):
if blocks[i] == self._null_block:
break
removed_blocks.append(blocks[i])
blocks[i] = self._null_block
self.block_pool.free_blocks(removed_blocks)
...
这解释了为什么 block table 中可能出现 block id 为 0 的 null block:它是一个占位,保持“请求逻辑 block 序号”稳定,但不代表需要真实参与 attention 的历史内容。对于 sliding/local attention,左侧被窗口跳过的 token 不再需要真实 KV。
11. 一个小例子
假设 block_size = 4,某请求已被分配物理 blocks:
逻辑 block index: 0 1 2
block_table row: [7, 3, 9]
逻辑 token position: 0 1 2 3 | 4 5 6 7 | 8 9 ...
物理 block_id: 7 7 7 7 | 3 3 3 3 | 9 9 ...
block_offset: 0 1 2 3 | 0 1 2 3 | 0 1 ...
slot_id: 28 29 30 31 | 12 13 14 15 | 36 37 ...
本轮如果只调度 position [6, 7, 8],则:
position 6 -> block_index 1 -> block_id 3 -> offset 2 -> slot 14
position 7 -> block_index 1 -> block_id 3 -> offset 3 -> slot 15
position 8 -> block_index 2 -> block_id 9 -> offset 0 -> slot 36
slot_mapping = [14, 15, 36]
这个 slot_mapping 会跟本轮算出的 key[0:3]、value[0:3] 对齐,cache update kernel 会把三行 K/V 分别写到物理 slot 14、15、36。
12. 时序图
@startuml
participant Scheduler
participant KVCacheManager
participant BlockPool
participant Worker
participant BlockTable
participant Attention
participant KVCache
Scheduler -> KVCacheManager: get_computed_blocks(request)
KVCacheManager -> BlockPool: lookup block_hash
BlockPool --> KVCacheManager: cached KVCacheBlock*
Scheduler -> KVCacheManager: allocate_slots(request, num_new_tokens)
KVCacheManager -> BlockPool: get_new_blocks()
BlockPool --> KVCacheManager: KVCacheBlock(block_id)
KVCacheManager --> Scheduler: KVCacheBlocks
Scheduler --> Worker: SchedulerOutput(block_ids)
Worker -> BlockTable: add_row()/append_row(block_ids)
Worker -> BlockTable: commit_block_table()
Worker -> Worker: build query_start_loc + positions
Worker -> BlockTable: compute_slot_mapping()
BlockTable --> Worker: slot_mapping
Worker -> Attention: set_forward_context(attn_metadata, slot_mapping)
Attention -> KVCache: reshape_and_cache(key, value, slot_mapping)
Attention -> KVCache: paged attention reads via block_table
@enduml
13. 总结
vLLM 的 KV cache slot mapping 机制可以概括为一句话:
调度器只分配和复用物理 block;worker 在每次 forward 前,基于当前请求行、逻辑 positions 和 block table,即时生成本轮 token 到物理 KV slot 的映射。
这种设计让调度器保持在“块管理”层面,不需要理解 batch 内 token 展开细节;worker 则在最接近 GPU 执行的位置生成 slot mapping,天然能处理 chunked prefill、decode、spec decode padding、context parallelism、hybrid KV cache group 等运行时变化。block_table 负责历史 KV 的分页读取,slot_mapping 负责当前 K/V 的精确写入,两者共同完成 paged KV cache 的寻址闭环。
- 感谢你赐予我前进的力量
