本文梳理了 vLLM V1 中 KV Cache 的 slot mapping 全流程:调度器如何拿到物理 KV block,worker 如何把 block table 转成 slot mapping,attention backend 又如何用它把当前 token 的 K/V 写入 paged KV cache,并在后续 attention 中读回历史上下文。

1. 三个核心坐标

KV cache 的寻址可以拆成三层坐标:

逻辑 token 位置 position
    |
    |  position // block_size
    v
请求内第几个 KV block
    |
    |  block_table[request_row, block_index]
    v
物理 block_id
    |
    |  block_id * block_size + position % block_size
    v
物理 slot_id

slot_mapping 就是“本轮 batch 中每个输入 token 要写入哪个物理 KV slot”的一维张量。它的典型公式是:

slot_id = physical_block_id * block_size + block_offset

其中 block_table 是按请求维护的二维表,记录“请求的第 N 个逻辑 block 对应哪个物理 block_id”;slot_mapping 是按本轮扁平化 token 序列维护的一维表,记录“第 i 个 input token 写到哪个物理 slot_id”。

2. 全链路总览

Request
  | prompt/output token ids
  | block_hashes
  v
Scheduler
  | get_computed_blocks()
  | allocate_slots()
  v
KVCacheManager / KVCacheCoordinator / SingleTypeKVCacheManager
  | req_id -> [KVCacheBlock(block_id=...)]
  v
SchedulerOutput
  | NewRequestData.block_ids
  | CachedRequestData.new_block_ids
  v
GpuModelRunner / InputBatch
  | MultiGroupBlockTable.add_row()/append_row()
  | commit_block_table()
  v
BlockTable on GPU
  | compute_slot_mapping(query_start_loc, positions)
  v
slot_mapping
  | CommonAttentionMetadata.slot_mapping
  | ForwardContext.slot_mapping[layer_name]
  v
Attention layer
  | unified_kv_cache_update()
  | reshape_and_cache(_flash)
  v
Paged KV cache memory

一次模型执行中,block_tableslot_mapping 分工明确:

  • block_table 给 attention kernel 读历史 KV,用于从 paged KV cache 中按 sequence 找页。
  • slot_mapping 给 KV cache update kernel 写当前这批 token 的 K/V,用于把新算出的 key/value 放进正确 page offset。
  • 在 V1 中,多个 KV cache group 可以有不同的 block table 和 slot mapping,最后按 layer name 分发给各层。

3. 调度器先分配 block,不直接生成 slot

slot mapping 不是调度器直接下发的。调度器只负责决定每个请求需要哪些 KV cache block,并把这些 block_id 带到 worker。

SchedulerOutput 中的新请求和老请求都只携带 block ids:

@dataclass
class NewRequestData:
    ...
    block_ids: tuple[list[int], ...]
    num_computed_tokens: int
    ...


@dataclass
class CachedRequestData:
    ...
    new_block_ids: list[tuple[list[int], ...] | None]
    num_computed_tokens: list[int]
    ...

这里的 tuple[list[int], ...] 外层按 KV cache group 划分,内层是该 group 下请求持有的物理 block id 列表。

等待队列中新请求第一次进入调度时,会先查 prefix cache,再为“cache miss 后需要计算的 token”分配新 block:

def schedule(self) -> SchedulerOutput:
    ...
    if request.num_computed_tokens == 0:
        # Get locally-cached tokens.
        new_computed_blocks, num_new_local_computed_tokens = (
            self.kv_cache_manager.get_computed_blocks(request)
        )
        ...
        num_computed_tokens = (
            num_new_local_computed_tokens + num_external_computed_tokens
        )
        assert num_computed_tokens <= request.num_tokens
    ...
    new_blocks = self.kv_cache_manager.allocate_slots(
        request,
        num_new_tokens,
        num_new_computed_tokens=num_new_local_computed_tokens,
        new_computed_blocks=new_computed_blocks,
        num_lookahead_tokens=effective_lookahead_tokens,
        num_external_computed_tokens=num_external_computed_tokens,
        delay_cache_blocks=load_kv_async,
        num_encoder_tokens=num_encoder_tokens,
    )
    ...
    req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(
        request_id
    )
    num_scheduled_tokens[request_id] = num_new_tokens
    ...

已经在 running 队列中的请求继续 decode 或 chunked prefill 时,也只追加新 block:

def schedule(self) -> SchedulerOutput:
    ...
    while True:
        new_blocks = self.kv_cache_manager.allocate_slots(
            request,
            num_new_tokens,
            num_lookahead_tokens=self.num_lookahead_tokens,
        )

        if new_blocks is not None:
            # The request can be scheduled.
            break
        ...
    req_to_new_blocks[request_id] = new_blocks
    num_scheduled_tokens[request_id] = num_new_tokens
    ...

调度结束时,SchedulerOutput 被构造出来,worker 只会看到 block id,而不会看到 slot id:

def schedule(self) -> SchedulerOutput:
    ...
    new_reqs_data = [
        NewRequestData.from_request(
            req, req_to_new_blocks[req.request_id].get_block_ids()
        )
        for req in scheduled_new_reqs
    ]
    ...
    cached_reqs_data = self._make_cached_request_data(
        scheduled_running_reqs,
        scheduled_resumed_reqs,
        num_scheduled_tokens,
        scheduled_spec_decode_tokens,
        req_to_new_blocks,
    )
    ...
    scheduler_output = SchedulerOutput(
        scheduled_new_reqs=new_reqs_data,
        scheduled_cached_reqs=cached_reqs_data,
        num_scheduled_tokens=num_scheduled_tokens,
        total_num_scheduled_tokens=total_num_scheduled_tokens,
        ...
    )
    ...

4. KVCacheManager:prefix hit、分配、缓存提交

KVCacheManager 是调度器与底层 block pool 之间的门面。它隐藏了 full attention、sliding window、chunked local attention、mamba、cross attention 等不同 KV cache type 的差异。

4.1 prefix cache 命中只命中完整 block

prefix cache 查询发生在请求还没有本地 computed token 时。命中长度最多到 request.num_tokens - 1,因为即使命中整个 prompt,最后一个 token 也要重算以产生 logits。

def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
    ...
    if not self.enable_caching or request.skip_reading_prefix_cache:
        return self.empty_kv_cache_blocks, 0

    # NOTE: When all tokens hit the cache, we must recompute the last token
    # to obtain logits. Thus, set max_cache_hit_length to prompt_length - 1.
    max_cache_hit_length = request.num_tokens - 1
    computed_blocks, num_new_computed_tokens = (
        self.coordinator.find_longest_cache_hit(
            request.block_hashes, max_cache_hit_length
        )
    )
    ...
    return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens
    ...

prefix cache 的索引来自 Request.block_hashes。请求创建和追加输出 token 时都会更新 full block 的 hash:

class Request:
    ...
    def append_output_token_ids(
        self,
        token_ids: int | list[int],
    ) -> None:
        if isinstance(token_ids, int):
            self._output_token_ids.append(token_ids)
            self._all_token_ids.append(token_ids)
        else:
            self._output_token_ids.extend(token_ids)
            self._all_token_ids.extend(token_ids)

        self.update_block_hashes()

    def update_block_hashes(self) -> None:
        """Compute block hashes for any new full blocks and append them."""
        if self._block_hasher is not None:
            self.block_hashes.extend(self._block_hasher(self))
    ...

4.2 allocate_slots 的三个动作

allocate_slots() 做三件事:

  1. 计算本轮之后需要有 slot 的 token 数,包括新 token 和 speculative lookahead token。
  2. 对 sliding window / local attention 等会跳过左侧 token 的场景,释放不再需要的旧 block。
  3. 分配新 block,并把已经完整的 block 写入 prefix cache 索引。
def allocate_slots(
    self,
    request: Request,
    num_new_tokens: int,
    num_new_computed_tokens: int = 0,
    new_computed_blocks: KVCacheBlocks | None = None,
    num_lookahead_tokens: int = 0,
    num_external_computed_tokens: int = 0,
    delay_cache_blocks: bool = False,
    num_encoder_tokens: int = 0,
) -> KVCacheBlocks | None:
    ...
    num_local_computed_tokens = (
        request.num_computed_tokens + num_new_computed_tokens
    )
    total_computed_tokens = min(
        num_local_computed_tokens + num_external_computed_tokens,
        self.max_model_len,
    )
    num_tokens_main_model = total_computed_tokens + num_new_tokens
    num_tokens_need_slot = min(
        num_tokens_main_model + num_lookahead_tokens,
        self.max_model_len,
    )

    self.coordinator.remove_skipped_blocks(
        request.request_id, total_computed_tokens
    )

    num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
        request_id=request.request_id,
        num_tokens=num_tokens_need_slot,
        new_computed_blocks=new_computed_block_list,
        num_encoder_tokens=num_encoder_tokens,
        total_computed_tokens=num_local_computed_tokens
        + num_external_computed_tokens,
        num_tokens_main_model=num_tokens_main_model,
    )

    if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
        return None

    if (
        new_computed_block_list is not self.empty_kv_cache_blocks.blocks
        or num_external_computed_tokens > 0
    ):
        self.coordinator.allocate_new_computed_blocks(
            request_id=request.request_id,
            new_computed_blocks=new_computed_block_list,
            num_local_computed_tokens=num_local_computed_tokens,
            num_external_computed_tokens=num_external_computed_tokens,
        )

    new_blocks = self.coordinator.allocate_new_blocks(
        request.request_id,
        num_tokens_need_slot,
        num_tokens_main_model,
        num_encoder_tokens,
    )
    ...
    num_tokens_to_cache = min(
        total_computed_tokens + num_new_tokens,
        request.num_tokens,
    )
    self.coordinator.cache_blocks(request, num_tokens_to_cache)

    return self.create_kv_cache_blocks(new_blocks)
    ...

这段代码里的 num_tokens_need_slot 是 slot mapping 后续能成立的前提:只要某个 position 会被本轮输入、后续 draft、或外部 KV load 使用,它对应的 block 就必须已经存在于 req_to_blocks 和 worker 的 block table 中。

4.3 SingleTypeKVCacheManager 维护 req_id 到 blocks 的映射

真正的“请求持有哪些物理 block”由 single-type manager 维护:

class SingleTypeKVCacheManager(ABC):
    ...
    def allocate_new_blocks(
        self, request_id: str, num_tokens: int, num_tokens_main_model: int
    ) -> list[KVCacheBlock]:
        ...
        req_blocks = self.req_to_blocks[request_id]
        num_required_blocks = cdiv(num_tokens, self.block_size)
        num_new_blocks = num_required_blocks - len(req_blocks)
        if num_new_blocks <= 0:
            return []
        else:
            new_blocks = self.block_pool.get_new_blocks(num_new_blocks)
            req_blocks.extend(new_blocks)
            if type(self.kv_cache_spec) is FullAttentionSpec:
                self.new_block_ids.extend(b.block_id for b in new_blocks)
            return new_blocks
    ...

prefix cache hit 的 block 会被 touch() 增加引用计数,防止被当作可驱逐 block 重新分配:

def allocate_new_computed_blocks(
    self,
    request_id: str,
    new_computed_blocks: Sequence[KVCacheBlock],
    num_local_computed_tokens: int,
    num_external_computed_tokens: int,
) -> None:
    ...
    if self.enable_caching:
        self.block_pool.touch(new_computed_blocks)
    ...
    req_blocks.extend([self._null_block] * num_skipped_blocks)
    req_blocks.extend(new_computed_blocks)
    self.num_cached_block[request_id] = len(req_blocks)
    ...

被完整计算过的 block 会被写入 prefix cache 的 hash 表。注意这里缓存的是 block 元数据映射,不是重新拷贝 KV tensor:

def cache_full_blocks(
    self,
    request: Request,
    blocks: list[KVCacheBlock],
    num_cached_blocks: int,
    num_full_blocks: int,
    block_size: int,
    kv_cache_group_id: int,
) -> None:
    ...
    new_full_blocks = blocks[num_cached_blocks:num_full_blocks]
    ...
    for i, blk in enumerate(new_full_blocks):
        if blk.is_null:
            continue
        assert blk.block_hash is None
        block_hash = new_block_hashes[i]

        block_hash_with_group_id = make_block_hash_with_group_id(
            block_hash, kv_cache_group_id
        )
        blk.block_hash = block_hash_with_group_id
        self.cached_block_hash_to_block.insert(block_hash_with_group_id, blk)
    ...

KVCacheBlock.block_id 是后续 worker 计算 slot id 的基础,而 block_hash 只服务 prefix cache 查找与复用。

5. Worker 接收 block_ids,维护 block table

worker 侧有一个持久的 InputBatch。每个活跃请求占一行,行号是 worker 内部的 req_index。调度器下发的 block ids 被写入这一行。

新请求进入 batch 时直接 add_row()

def add_request(
    self,
    request: "CachedRequestState",
) -> int:
    ...
    self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
    self.block_table.add_row(request.block_ids, req_index)
    ...
    return req_index
    ...

已存在请求收到新增 block 时 append_row()

def _update_states(self, scheduler_output: "SchedulerOutput") -> Callable | None:
    ...
    if not resumed_from_preemption:
        if new_block_ids is not None:
            # Append the new blocks to the existing block IDs.
            for block_ids, new_ids in zip(req_state.block_ids, new_block_ids):
                block_ids.extend(new_ids)
    ...
    self.input_batch.num_computed_tokens_cpu[req_index] = num_computed_tokens
    if new_block_ids is not None:
        self.input_batch.block_table.append_row(new_block_ids, req_index)
    ...

BlockTable 的 CPU buffer 是 staging 区,真正模型执行前会拷到 GPU:

class BlockTable:
    ...
    def append_row(
        self,
        block_ids: list[int],
        row_idx: int,
    ) -> None:
        if not block_ids:
            return
        ...
        num_blocks = len(block_ids)
        start = self.num_blocks_per_row[row_idx]
        self.num_blocks_per_row[row_idx] += num_blocks
        self.block_table.np[row_idx, start : start + num_blocks] = block_ids

    def add_row(self, block_ids: list[int], row_idx: int) -> None:
        self.num_blocks_per_row[row_idx] = 0
        self.append_row(block_ids, row_idx)
    ...
    def commit_block_table(self, num_reqs: int) -> None:
        self.block_table.copy_to_gpu(num_reqs)
    ...

多个 KV cache group 时,MultiGroupBlockTable 对每个 group 各维护一张 BlockTable

class MultiGroupBlockTable:
    """The BlockTables for each KV cache group."""
    ...
    def append_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None:
        for i, block_table in enumerate(self.block_tables):
            block_table.append_row(block_ids[i], row_idx)

    def add_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None:
        for i, block_table in enumerate(self.block_tables):
            block_table.add_row(block_ids[i], row_idx)
    ...
    def compute_slot_mapping(
        self,
        num_reqs: int,
        query_start_loc: torch.Tensor,
        positions: torch.Tensor,
    ) -> None:
        for block_table in self.block_tables:
            block_table.compute_slot_mapping(num_reqs, query_start_loc, positions)
    ...

6. 从 positions 到 slot_mapping

GpuModelRunner._prepare_inputs() 会把本轮调度结果展开成扁平 token batch。关键中间量有两个:

  • query_start_loc:每个请求在扁平 token batch 中的起止位置。
  • positions:每个扁平 token 在原请求序列中的逻辑 position。

例如本轮三个请求分别调度 [2, 5, 3] 个 token:

query_start_loc = [0, 2, 7, 10]
query_pos       = [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
positions       = num_computed_tokens[req] + query_pos

对应代码:

def _prepare_inputs(
    self,
    scheduler_output: "SchedulerOutput",
    num_scheduled_tokens: np.ndarray,
) -> tuple[
    torch.Tensor,
    SpecDecodeMetadata | None,
]:
    ...
    self.input_batch.block_table.commit_block_table(num_reqs)

    # Get request indices.
    # E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2]
    req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)

    # cu_num_tokens: [2, 5, 3] -> [2, 7, 10]
    # self.query_pos.np[:10]: [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
    cu_num_tokens = self._get_cumsum_and_arange(
        num_scheduled_tokens, self.query_pos.np
    )

    # Get positions.
    positions_np = (
        self.input_batch.num_computed_tokens_cpu[req_indices]
        + self.query_pos.np[: cu_num_tokens[-1]]
    )
    ...
    self.query_start_loc.np[0] = 0
    self.query_start_loc.np[1 : num_reqs + 1] = cu_num_tokens
    self.query_start_loc.copy_to_gpu()
    ...
    self.positions[:total_num_scheduled_tokens] = (
        self.num_computed_tokens[req_indices_gpu].to(torch.int64)
        + self.query_pos.gpu[:total_num_scheduled_tokens]
    )
    ...
    self.input_batch.block_table.compute_slot_mapping(
        num_reqs,
        self.query_start_loc.gpu[: num_reqs + 1],
        self.positions[:total_num_scheduled_tokens],
    )
    ...

slot mapping 的 Triton kernel 逐请求处理。对于请求内每个 token,先用 position // block_size 找逻辑 block index,再从 block table 取物理 block id,最后组合成物理 slot id:

@triton.jit
def _compute_slot_mapping_kernel(
    num_tokens,
    max_num_tokens,
    query_start_loc_ptr,  # [num_reqs + 1], int32
    positions_ptr,  # [num_tokens], int64
    block_table_ptr,  # [max_num_reqs, max_num_blocks_per_req], int32 (flat)
    block_table_stride,  # max_num_blocks_per_req
    block_size,
    slot_mapping_ptr,  # [max_num_tokens], int64
    TOTAL_CP_WORLD_SIZE: tl.constexpr,
    TOTAL_CP_RANK: tl.constexpr,
    CP_KV_CACHE_INTERLEAVE_SIZE: tl.constexpr,
    PAD_ID: tl.constexpr,
    BLOCK_SIZE: tl.constexpr,
):
    ...
    start_idx = tl.load(query_start_loc_ptr + req_idx).to(tl.int64)
    end_idx = tl.load(query_start_loc_ptr + req_idx + 1).to(tl.int64)

    virtual_block_size = block_size * TOTAL_CP_WORLD_SIZE
    row_offset = req_idx * block_table_stride
    for i in range(start_idx, end_idx, BLOCK_SIZE):
        offsets = i + tl.arange(0, BLOCK_SIZE)
        mask = offsets < end_idx
        pos = tl.load(positions_ptr + offsets, mask=mask, other=0)
        block_indices = pos // virtual_block_size
        block_numbers = tl.load(block_table_ptr + row_offset + block_indices).to(
            tl.int64
        )
        ...
        slot_ids = block_numbers * block_size + local_block_offsets
        slot_ids = tl.where(is_local, slot_ids, PAD_ID)
        tl.store(slot_mapping_ptr + offsets, slot_ids, mask=mask)
    ...

在没有 context parallelism 时,这段可以简化理解为:

block_index  = position // block_size
block_offset = position %  block_size
block_id     = block_table[req_idx, block_index]
slot_id      = block_id * block_size + block_offset

context parallelism 存在时,一个全局 sequence 的 token 按 CP_KV_CACHE_INTERLEAVE_SIZE 在多个 rank 间交错分片。非本 rank 的 token 会被写成 PAD_SLOT_ID,后续 cache update kernel 看到负 slot 会跳过。

新版 GPU worker 路径中同样有 fused multi-group 版本。它一次生成 [num_kv_cache_groups, num_tokens] 的 slot mappings:

def compute_slot_mappings(
    self,
    idx_mapping: torch.Tensor,
    query_start_loc: torch.Tensor,
    positions: torch.Tensor,
    num_tokens_padded: int,
) -> torch.Tensor:
    ...
    _compute_slot_mappings_kernel[(num_groups, num_reqs + 1)](
        self.max_num_batched_tokens,
        idx_mapping,
        query_start_loc,
        positions,
        self.block_table_ptrs,
        self.block_table_strides,
        self.block_sizes_tensor,
        self.slot_mappings,
        self.slot_mappings.stride(0),
        self.cp_rank,
        CP_SIZE=self.cp_size,
        CP_INTERLEAVE=self.cp_interleave,
        PAD_ID=PAD_SLOT_ID,
        TRITON_BLOCK_SIZE=1024,
    )
    return self.slot_mappings[:, :num_tokens_padded]
    ...

7. slot_mapping 如何进入 attention

slot mapping 会走两条路:

  1. 放进 CommonAttentionMetadata,给 attention backend 构造读 KV cache 的 metadata。
  2. 放进 ForwardContext.slot_mapping,给每一层的 KV cache update op 写 KV cache。

CommonAttentionMetadata 中直接持有 block_table_tensorslot_mapping

@dataclass
class CommonAttentionMetadata:
    ...
    block_table_tensor: torch.Tensor
    slot_mapping: torch.Tensor
    ...

_build_attention_metadata() 会按 KV cache group 取对应的 block table 和 slot mapping,并在每个 group 内把 metadata 分发给 layer:

def _build_attention_metadata(
    self,
    num_tokens: int,
    num_reqs: int,
    max_query_len: int,
    ...
    slot_mappings: dict[int, torch.Tensor] | None = None,
) -> tuple[PerLayerAttnMetadata, CommonAttentionMetadata | None]:
    ...
    assert slot_mappings is not None
    block_table_gid_0 = _get_block_table(0)
    slot_mapping_gid_0 = slot_mappings[0]
    ...
    cm_base = CommonAttentionMetadata(
        query_start_loc=self.query_start_loc.gpu[: num_reqs_padded + 1],
        query_start_loc_cpu=self.query_start_loc.cpu[: num_reqs_padded + 1],
        seq_lens=self.seq_lens[:num_reqs_padded],
        ...
        block_table_tensor=block_table_gid_0,
        slot_mapping=slot_mapping_gid_0,
        causal=True,
        is_prefilling=is_prefilling,
    )
    ...
    for kv_cache_gid, kv_cache_group in enumerate(kv_cache_groups):
        cm = copy(cm_base)  # shallow copy
        ...
        if kv_cache_gid > 0:
            cm.block_table_tensor = _get_block_table(kv_cache_gid)
            cm.slot_mapping = slot_mappings[kv_cache_gid]
        ...

同时,worker 会构造 layer name 到 slot mapping 的映射,放入 forward context:

def build_slot_mappings_by_layer(
    slot_mappings: torch.Tensor, kv_cache_config: KVCacheConfig
) -> dict[str, torch.Tensor]:
    slot_mappings_by_layer: dict[str, torch.Tensor] = {}
    kv_cache_groups = kv_cache_config.kv_cache_groups
    for slot_mapping, kv_cache_group in zip(slot_mappings, kv_cache_groups):
        for layer_name in kv_cache_group.layer_names:
            slot_mappings_by_layer[layer_name] = slot_mapping
    return slot_mappings_by_layer
    ...

模型 forward 前,slot_mapping=slot_mappings_by_layer 被挂到 forward context:

with set_forward_context(
    attn_metadata,
    self.vllm_config,
    num_tokens=input_batch.num_tokens_after_padding,
    cudagraph_runtime_mode=batch_desc.cg_mode,
    num_tokens_across_dp=num_tokens_across_dp,
    batch_descriptor=batch_descriptor,
    slot_mapping=slot_mappings_by_layer,
    skip_compiled=skip_compiled,
):
    self.kv_connector.pre_forward(scheduler_output)
    model_output = self.model(**model_inputs)
...

8. Attention 层用 slot_mapping 写当前 K/V

以 Llama attention 为例,模型层先算 QKV,再调用通用 Attention

def forward(
    self,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
) -> torch.Tensor:
    qkv, _ = self.qkv_proj(hidden_states)
    q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
    q, k = self.rotary_emb(positions, q, k)
    attn_output = self.attn(q, k, v)
    output, _ = self.o_proj(attn_output)
    return output
    ...

Attention.forward() 中,若 backend 的 forward 不自带 KV cache update,则先显式调用 unified_kv_cache_update(),再调用 attention:

def forward(
    self,
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    ...
) -> torch.Tensor:
    ...
    if (
        not self.attn_backend.forward_includes_kv_cache_update
        and self.kv_sharing_target_layer_name is None
        and key is not None
        and value is not None
    ):
        kv_cache_dummy_dep = unified_kv_cache_update(
            key, value, self.layer_name
        )
    unified_attention_with_output(
        query,
        key,
        value,
        output,
        self.layer_name,
        kv_cache_dummy_dep=kv_cache_dummy_dep,
    )
    ...

unified_kv_cache_update() 从 forward context 中取当前 layer 的 slot mapping:

def unified_kv_cache_update(
    key: torch.Tensor,
    value: torch.Tensor,
    layer_name: str,
) -> torch.Tensor:
    ...
    _, attn_layer, kv_cache, layer_slot_mapping = get_attention_context(layer_name)
    if layer_slot_mapping is not None:
        assert hasattr(attn_layer.impl, "do_kv_cache_update"), (
            f"{attn_layer.impl.__class__.__name__} does not support kv cache update"
        )
        attn_layer.impl.do_kv_cache_update(
            attn_layer,
            key,
            value,
            kv_cache,
            layer_slot_mapping,
        )

    return torch.empty(0, device=kv_cache.device, dtype=kv_cache.dtype)
    ...

FlashAttention backend 的 update 逻辑将 kv_cache 拆成 key cache 和 value cache,然后调用 reshape_and_cache_flash()

def do_kv_cache_update(
    self,
    layer: torch.nn.Module,
    key: torch.Tensor,
    value: torch.Tensor,
    kv_cache: torch.Tensor,
    slot_mapping: torch.Tensor,
) -> None:
    if self.attn_type in (AttentionType.ENCODER_ONLY, AttentionType.ENCODER):
        return

    key_cache, value_cache = kv_cache.unbind(0)

    # Reshape the input keys and values and store them in the cache.
    # Skip this if sharing KV cache with an earlier attention layer.
    reshape_and_cache_flash(
        key,
        value,
        key_cache,
        value_cache,
        slot_mapping,
        self.kv_cache_dtype,
        layer._k_scale,
        layer._v_scale,
    )
    ...

底层 CUDA kernel 对每个 token 读取 slot id,并拆成 block index 与 block 内 offset:

template <typename scalar_t, typename cache_t, Fp8KVCacheDataType kv_dt>
__global__ void reshape_and_cache_flash_kernel(
    const scalar_t* __restrict__ key,    // [num_tokens, num_heads, head_size]
    const scalar_t* __restrict__ value,  // [num_tokens, num_heads, head_size]
    cache_t* __restrict__ key_cache,     // NHD or HND, shape see comments below
    cache_t* __restrict__ value_cache,   // same above
    const int64_t* __restrict__ slot_mapping,  // [num_tokens]
    ...
) {
  const int64_t token_idx = blockIdx.x;
  const int64_t slot_idx = slot_mapping[token_idx];
  // NOTE: slot_idx can be -1 if the token is padded
  if (slot_idx < 0) {
    return;
  }
  const int64_t block_idx = slot_idx / block_size;
  const int64_t block_offset = slot_idx % block_size;
  ...
  cache_t* __restrict__ key_dst =
      key_cache + block_idx * block_stride + block_offset * page_stride;
  cache_t* __restrict__ value_dst =
      value_cache + block_idx * block_stride + block_offset * page_stride;
  ...
}
...

这里的 slot_idx < 0 与前面 slot mapping kernel 填充的 PAD_SLOT_ID 对应。padding token、非本 context-parallel rank 的 token,都不会写入 KV cache。

9. block_table 如何参与读历史 KV

写当前 K/V 依赖 slot_mapping,读历史 K/V 依赖 block_table。FlashAttention metadata builder 把两者一起放入 backend metadata:

def build(
    self,
    common_prefix_len: int,
    common_attn_metadata: CommonAttentionMetadata,
    fast_build: bool = False,
) -> FlashAttentionMetadata:
    ...
    block_table_tensor = common_attn_metadata.block_table_tensor
    slot_mapping = common_attn_metadata.slot_mapping
    ...
    attn_metadata = FlashAttentionMetadata(
        num_actual_tokens=num_actual_tokens,
        max_query_len=max_query_len,
        query_start_loc=query_start_loc,
        max_seq_len=max_seq_len,
        seq_lens=seq_lens,
        block_table=block_table_tensor,
        slot_mapping=slot_mapping,
        ...
    )
    return attn_metadata
    ...

后续 attention forward 会把 attn_metadata.block_table 传给 paged attention / flash attention kernel。这样每个请求的历史 KV 不需要连续存储,只要 block table 能把逻辑 block 序号翻译成物理 page 即可。

因此完整读写模型是:

当前 token 的 K/V 写入:
  token_idx -> slot_mapping[token_idx]
            -> block_idx + block_offset
            -> key_cache/value_cache[block_idx, block_offset, ...]

历史上下文 K/V 读取:
  req_idx + logical kv position
            -> block_table[req_idx, logical_block_idx]
            -> physical block/page
            -> key_cache/value_cache[block, offset, ...]

10. prefix cache、滑窗与 null block

Full attention 的 prefix hit 是从左到右连续匹配 block hash:

class FullAttentionManager(SingleTypeKVCacheManager):
    @classmethod
    def find_longest_cache_hit(
        cls,
        block_hashes: BlockHashList,
        max_length: int,
        kv_cache_group_ids: list[int],
        block_pool: BlockPool,
        kv_cache_spec: KVCacheSpec,
        use_eagle: bool,
        alignment_tokens: int,
        ...
    ) -> tuple[list[KVCacheBlock], ...]:
        ...
        max_num_blocks = max_length // block_size
        for block_hash in itertools.islice(block_hashes, max_num_blocks):
            if cached_block := block_pool.get_cached_block(
                block_hash, kv_cache_group_ids
            ):
                for computed, cached in zip(computed_blocks, cached_block):
                    computed.append(cached)
            else:
                break
        if use_eagle and computed_blocks[0]:
            for computed in computed_blocks:
                computed.pop()
        ...
        return computed_blocks
    ...

sliding window 不会永远保留最左侧 block。它会根据当前 computed token 数计算被窗口跳过的 token,并把对应 block 释放、替换成 null_block

def remove_skipped_blocks(
    self, request_id: str, total_computed_tokens: int
) -> None:
    ...
    num_skipped_tokens = self.get_num_skipped_tokens(total_computed_tokens)
    if num_skipped_tokens <= 0:
        return
    blocks = self.req_to_blocks[request_id]
    num_skipped_blocks = num_skipped_tokens // self.block_size
    num_skipped_blocks = min(num_skipped_blocks, len(blocks))
    removed_blocks: list[KVCacheBlock] = []
    for i in range(num_skipped_blocks - 1, -1, -1):
        if blocks[i] == self._null_block:
            break
        removed_blocks.append(blocks[i])
        blocks[i] = self._null_block
    self.block_pool.free_blocks(removed_blocks)
    ...

这解释了为什么 block table 中可能出现 block id 为 0 的 null block:它是一个占位,保持“请求逻辑 block 序号”稳定,但不代表需要真实参与 attention 的历史内容。对于 sliding/local attention,左侧被窗口跳过的 token 不再需要真实 KV。

11. 一个小例子

假设 block_size = 4,某请求已被分配物理 blocks:

逻辑 block index:      0    1    2
block_table row:     [7,  3,  9]

逻辑 token position:  0 1 2 3 | 4 5 6 7 | 8 9 ...
物理 block_id:        7 7 7 7 | 3 3 3 3 | 9 9 ...
block_offset:         0 1 2 3 | 0 1 2 3 | 0 1 ...
slot_id:             28 29 30 31 | 12 13 14 15 | 36 37 ...

本轮如果只调度 position [6, 7, 8],则:

position 6 -> block_index 1 -> block_id 3 -> offset 2 -> slot 14
position 7 -> block_index 1 -> block_id 3 -> offset 3 -> slot 15
position 8 -> block_index 2 -> block_id 9 -> offset 0 -> slot 36

slot_mapping = [14, 15, 36]

这个 slot_mapping 会跟本轮算出的 key[0:3]value[0:3] 对齐,cache update kernel 会把三行 K/V 分别写到物理 slot 14、15、36。

12. 时序图

@startuml
participant Scheduler
participant KVCacheManager
participant BlockPool
participant Worker
participant BlockTable
participant Attention
participant KVCache

Scheduler -> KVCacheManager: get_computed_blocks(request)
KVCacheManager -> BlockPool: lookup block_hash
BlockPool --> KVCacheManager: cached KVCacheBlock*
Scheduler -> KVCacheManager: allocate_slots(request, num_new_tokens)
KVCacheManager -> BlockPool: get_new_blocks()
BlockPool --> KVCacheManager: KVCacheBlock(block_id)
KVCacheManager --> Scheduler: KVCacheBlocks
Scheduler --> Worker: SchedulerOutput(block_ids)

Worker -> BlockTable: add_row()/append_row(block_ids)
Worker -> BlockTable: commit_block_table()
Worker -> Worker: build query_start_loc + positions
Worker -> BlockTable: compute_slot_mapping()
BlockTable --> Worker: slot_mapping

Worker -> Attention: set_forward_context(attn_metadata, slot_mapping)
Attention -> KVCache: reshape_and_cache(key, value, slot_mapping)
Attention -> KVCache: paged attention reads via block_table
@enduml

13. 总结

vLLM 的 KV cache slot mapping 机制可以概括为一句话:

调度器只分配和复用物理 block;worker 在每次 forward 前,基于当前请求行、逻辑 positions 和 block table,即时生成本轮 token 到物理 KV slot 的映射。

这种设计让调度器保持在“块管理”层面,不需要理解 batch 内 token 展开细节;worker 则在最接近 GPU 执行的位置生成 slot mapping,天然能处理 chunked prefill、decode、spec decode padding、context parallelism、hybrid KV cache group 等运行时变化。block_table 负责历史 KV 的分页读取,slot_mapping 负责当前 K/V 的精确写入,两者共同完成 paged KV cache 的寻址闭环。