本文聚焦 vLLM V1 离线推理入口 LLM 的 Client 侧链路,覆盖这些核心类:

  • LLM
  • LLMEngine
  • SyncMPClient
  • InputProcessor
  • OutputProcessor
  • RequestState

这里的 RequestState 特指 vllm.v1.engine.output_processor.RequestState。Core 侧内部的调度、KV cache、模型执行细节都视为黑盒,只关心 Client 如何构造请求、如何通过 ZMQ 把请求交给 Core、又如何拿回 EngineCoreOutputs 并组装成最终 RequestOutput

默认路径以 VLLM_ENABLE_V1_MULTIPROCESSING=1 为前提,也就是 LLMEngine 通过 SyncMPClient 连接后台 EngineCoreProc。如果关闭 V1 multiprocessing,EngineCoreClient.make_client() 会走 InprocClient,请求和输出都在同进程直接调用,不经过本文重点讨论的 ZMQ 传输层。

零、总览

0.1 一张时序图

@startuml
title vLLM Client-side offline inference flow

actor App
participant LLM
participant LLMEngine
participant InputProcessor
participant OutputProcessor
participant RequestState
participant SyncMPClient
participant "EngineCoreProc\n(Core black box)" as Core

== 构造 ==
App -> LLM: LLM(model, ...)
LLM -> LLM: build EngineArgs
LLM -> LLMEngine: from_engine_args(...)
LLMEngine -> LLMEngine: create VllmConfig / Executor class
LLMEngine -> InputProcessor: new(vllm_config, renderer)
LLMEngine -> OutputProcessor: new(tokenizer, log_stats, stream_interval)
LLMEngine -> SyncMPClient: EngineCoreClient.make_client(...)
SyncMPClient -> Core: launch process + startup handshake
Core -> SyncMPClient: DEALER connects to input ROUTER
Core -> SyncMPClient: PUSH connects to output PULL
LLM -> SyncMPClient: get_supported_tasks() as UTILITY RPC
SyncMPClient --> LLM: supported tasks

== 添加请求 ==
App -> LLM: generate(prompts, sampling_params)
LLM -> LLM: render prompt into EngineInput
LLM -> LLMEngine: add_request("0", EngineInput, params)
LLMEngine -> InputProcessor: process_inputs(...)
InputProcessor --> LLMEngine: EngineCoreRequest(request_id="0")
LLMEngine -> InputProcessor: assign_request_id(...)
note right
external_req_id = "0"
request_id = "0-<random8>"
end note
LLMEngine -> OutputProcessor: add_request(request, prompt_text)
OutputProcessor -> RequestState: from_new_request(...)
LLMEngine -> SyncMPClient: add_request(request)
SyncMPClient -> Core: ROUTER/DEALER send ADD + msgpack request

== 拉取输出 ==
loop until OutputProcessor has no unfinished requests
  LLM -> LLMEngine: step()
  LLMEngine -> SyncMPClient: get_output()
  Core -> SyncMPClient: PUSH/PULL EngineCoreOutputs
  SyncMPClient --> LLMEngine: EngineCoreOutputs
  LLMEngine -> OutputProcessor: process_outputs(outputs.outputs)
  OutputProcessor -> RequestState: detokenize / logprobs / make RequestOutput
  OutputProcessor --> LLMEngine: RequestOutput list + reqs_to_abort
  LLMEngine -> SyncMPClient: abort stop-string requests if needed
  LLMEngine --> LLM: step outputs
end

LLM --> App: sorted final RequestOutput list
@enduml

0.2 Client/Core 边界

可以把 V1 离线推理拆成三层:

API facade
  LLM
    - 面向用户的 generate/chat/embed/classify 等方法
    - 渲染 prompt,分配外部 request_id,驱动同步 step 循环

Frontend engine
  LLMEngine
    - 持有 InputProcessor / OutputProcessor / EngineCoreClient
    - 把 EngineInput 变成 EngineCoreRequest
    - 把 EngineCoreOutput 变成 RequestOutput

Transport to Core
  SyncMPClient
    - Client 进程里的同步 ZMQ client
    - ADD/ABORT/UTILITY 请求走 input ROUTER -> Core DEALER
    - EngineCoreOutputs 走 Core PUSH -> Client PULL

Core black box
  EngineCoreProc
    - 后台进程
    - 接收 EngineCoreRequest,产出 EngineCoreOutputs

ZMQ 连接拓扑如下:

Client process                                      EngineCoreProc process
----------------                                    ----------------------

input_socket: ROUTER  bind(addr_in)  <----------->  DEALER connect(addr_in)
  send: [engine_identity, request_type, msgpack frames]

output_socket: PULL   bind(addr_out) <-------------  PUSH connect(addr_out)
  recv: [msgpack EngineCoreOutputs frames]

handshake_socket: ROUTER bind(addr_hs) <---------->  DEALER connect(addr_hs)
  startup only: HELLO -> EngineHandshakeMetadata -> READY

一、构造流程:从 LLM 拉起整套 Client 系统

1.1 LLM:用户入口和 EngineArgs 构造者

LLM.__init__() 的职责是把用户传入的模型、tokenizer、并行、cache、编译、LoRA、采样相关配置汇总到 EngineArgs,再用这些参数创建 LLMEngineLLM 本身不直接管理 Core,它持有的是 self.llm_engine

def __init__(
    self,
    model: str,
    *,
    runner: RunnerOption = "auto",
    convert: ConvertOption = "auto",
    tokenizer: str | None = None,
    ...
) -> None:
    """LLM constructor."""
    ...
    engine_args = EngineArgs(
        model=model,
        runner=runner,
        convert=convert,
        tokenizer=tokenizer,
        tokenizer_mode=tokenizer_mode,
        skip_tokenizer_init=skip_tokenizer_init,
        trust_remote_code=trust_remote_code,
        ...
        compilation_config=compilation_config_instance,
        logits_processors=logits_processors,
        **kwargs,
    )

    log_non_default_args(engine_args)

    self.llm_engine = LLMEngine.from_engine_args(
        engine_args=engine_args, usage_context=UsageContext.LLM_CLASS
    )
    self.model_config = self.llm_engine.model_config
    self.engine_class = type(self.llm_engine)

    self.request_counter = Counter()
    self.default_sampling_params: dict[str, Any] | None = None

    supported_tasks = self.llm_engine.get_supported_tasks()
    self.supported_tasks = supported_tasks
    self.pooling_task = self.model_config.get_pooling_task(supported_tasks)
    ...
    self.renderer = self.llm_engine.renderer
    self.chat_template = load_chat_template(chat_template)
    self.io_processor = self.llm_engine.io_processor
    self.input_processor = self.llm_engine.input_processor
    ...

这里有两个点很关键:

  • LLM 创建完成前就会调用 self.llm_engine.get_supported_tasks()
  • SyncMPClient 路径下,这不是本地普通方法调用,而是一次 UTILITY RPC:Client 通过 ZMQ 请求 Core 执行 get_supported_tasks,再等待输出线程把结果放回 Future。

1.2 LLMEngine.from_engine_args():配置落地并选择 Executor

LLMEngine.from_engine_args() 先将 EngineArgs 转为完整的 VllmConfig,再根据配置选择 V1 Executor 类型。随后根据环境变量决定是否启用多进程模式。

@classmethod
def from_engine_args(
    cls,
    engine_args: EngineArgs,
    usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
    stat_loggers: list[StatLoggerFactory] | None = None,
    enable_multiprocessing: bool = False,
) -> "LLMEngine":
    """Creates an LLM engine from the engine arguments."""

    # Create the engine configs.
    vllm_config = engine_args.create_engine_config(usage_context)
    executor_class = Executor.get_class(vllm_config)

    if envs.VLLM_ENABLE_V1_MULTIPROCESSING:
        logger.debug("Enabling multiprocessing for LLMEngine.")
        enable_multiprocessing = True

    # Create the LLMEngine.
    return cls(
        vllm_config=vllm_config,
        executor_class=executor_class,
        log_stats=not engine_args.disable_log_stats,
        usage_context=usage_context,
        stat_loggers=stat_loggers,
        multiprocess_mode=enable_multiprocessing,
    )

1.3 LLMEngine.__init__():装配 Input/Output Processor 和 Core Client

LLMEngine 是 Client 侧推理流程的中枢。它装配:

  • renderer:把高层输入渲染成 token/multimodal 形式。
  • InputProcessor:把 EngineInput 变成 EngineCoreRequest
  • OutputProcessor:把 Core 返回的 EngineCoreOutputs 变成外部 API 的 RequestOutput
  • EngineCoreClient:根据模式选择 SyncMPClientInprocClient
def __init__(
    self,
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
    ...
    multiprocess_mode: bool = False,
) -> None:
    self.vllm_config = vllm_config
    self.model_config = vllm_config.model_config
    self.observability_config = vllm_config.observability_config
    ...
    self.renderer = renderer = renderer_from_config(self.vllm_config)
    self.io_processor = get_io_processor(
        self.vllm_config,
        self.renderer,
        self.model_config.io_processor_plugin,
    )

    # Convert EngineInput --> EngineCoreRequest.
    self.input_processor = InputProcessor(self.vllm_config, renderer)

    # Converts EngineCoreOutputs --> RequestOutput.
    self.output_processor = OutputProcessor(
        renderer.tokenizer,
        log_stats=self.log_stats,
        stream_interval=self.vllm_config.scheduler_config.stream_interval,
        tracing_enabled=tracing_endpoint is not None,
    )

    # EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
    self.engine_core = EngineCoreClient.make_client(
        multiprocess_mode=multiprocess_mode,
        asyncio_mode=False,
        vllm_config=vllm_config,
        executor_class=executor_class,
        log_stats=self.log_stats,
    )
    ...
    self.reset_mm_cache()

1.4 EngineCoreClient.make_client():同步离线模式选择 SyncMPClient

LLMEngineasyncio_mode=False 调用 make_client()。当 multiprocess_mode=True 时,返回的就是 SyncMPClient

@staticmethod
def make_client(
    multiprocess_mode: bool,
    asyncio_mode: bool,
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
) -> "EngineCoreClient":
    # TODO: support this for debugging purposes.
    if asyncio_mode and not multiprocess_mode:
        raise NotImplementedError(
            "Running EngineCore in asyncio without multiprocessing "
            "is not currently supported."
        )

    if multiprocess_mode and asyncio_mode:
        return EngineCoreClient.make_async_mp_client(
            vllm_config, executor_class, log_stats
        )

    if multiprocess_mode and not asyncio_mode:
        return SyncMPClient(vllm_config, executor_class, log_stats)

    return InprocClient(vllm_config, executor_class, log_stats)

1.5 MPClient:创建 ZMQ socket、启动 Core、等待 ready

SyncMPClient 继承自 MPClient。大部分 ZMQ 和后台 Core 进程启动逻辑都在 MPClient.__init__()

Client 侧先创建一个 ZMQ context,然后分两种情况:

  • 传入 client_addresses:Core 由外部管理,Client 只绑定/连接既有地址。
  • 未传入 client_addresses:Client 自己分配地址并通过 launch_core_engines() 启动后台 EngineCoreProc

默认离线 LLM 属于第二种。

def __init__(
    self,
    asyncio_mode: bool,
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
    client_addresses: dict[str, str] | None = None,
):
    self.vllm_config = vllm_config

    # ZMQ setup.
    sync_ctx = zmq.Context(io_threads=2)
    self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctx
    ...
    if client_addresses:
        # Engines are managed externally to this client.
        input_address = client_addresses["input_address"]
        output_address = client_addresses["output_address"]
        ...
        self.input_socket = self.resources.input_socket = make_zmq_socket(
            self.ctx,
            input_address,
            zmq.ROUTER,
            bind=True,
            router_handover=enable_input_socket_handover,
        )
        self.resources.output_socket = make_zmq_socket(
            self.ctx, output_address, zmq.PULL
        )
    else:
        # Engines are managed by this client.
        addresses = get_engine_zmq_addresses(vllm_config)
        self.input_socket = self.resources.input_socket = make_zmq_socket(
            self.ctx,
            addresses.inputs[0],
            zmq.ROUTER,
            bind=True,
            router_handover=enable_input_socket_handover,
        )
        self.resources.output_socket = make_zmq_socket(
            self.ctx, addresses.outputs[0], zmq.PULL
        )

        with launch_core_engines(
            vllm_config, executor_class, log_stats, addresses
        ) as (engine_manager, coordinator, addresses, tensor_queue):
            self.resources.coordinator = coordinator
            self.resources.engine_manager = engine_manager
    ...
    # Wait for ready messages from each engine on the input socket.
    identities = set(self.core_engines)
    sync_input_socket = zmq.Socket.shadow(self.input_socket)
    while identities:
        if not sync_input_socket.poll(
            timeout=VLLM_ENGINE_READY_TIMEOUT_S * 1000
        ):
            raise TimeoutError(...)
        identity, _ = sync_input_socket.recv_multipart()
        identities.remove(identity)

    self.core_engine: EngineIdentity = self.core_engines[0]
    self.utility_results: dict[int, AnyFuture] = {}
    ...
    self.start_engine_core_monitor()

get_engine_zmq_addresses() 负责分配 input/output 地址。本地 colocated 场景使用 IPC 地址,跨节点场景使用 TCP 地址。

def get_engine_client_zmq_addr(local_only: bool, host: str, port: int = 0) -> str:
    """Assign a new ZMQ socket address.

    If local_only is True, participants are colocated and so a unique IPC
    address will be returned.

    Otherwise, the provided host and port will be used to construct a TCP
    address (port == 0 means assign an available port)."""

    return (
        get_open_zmq_ipc_path()
        if local_only
        else (get_tcp_uri(host, port or get_open_port()))
    )

具体 socket 的 bind/connect 语义由 make_zmq_socket() 统一处理。默认情况下,ROUTER/PULL 会 bind,PUSH 会 connect。

def make_zmq_socket(
    ctx: zmq.asyncio.Context | zmq.Context,
    path: str,
    socket_type: Any,
    bind: bool | None = None,
    identity: bytes | None = None,
    linger: int | None = None,
    router_handover: bool = False,
) -> zmq.Socket | zmq.asyncio.Socket:
    """Make a ZMQ socket with the proper bind/connect semantics."""
    ...
    if bind is None:
        bind = socket_type not in (zmq.PUSH, zmq.SUB, zmq.XSUB)
    ...
    if identity is not None:
        socket.setsockopt(zmq.IDENTITY, identity)
    ...
    if bind:
        socket.bind(path)
    else:
        socket.connect(path)

    return socket

1.6 launch_core_engines():启动 Core 并完成 startup handshake

launch_core_engines() 会创建 CoreEngineProcManager,后者启动一个或多个 EngineCoreProc 后台进程。随后 wait_for_engine_startup() 在 handshake socket 上等待 Core 发来的 HELLOREADY

@contextlib.contextmanager
def launch_core_engines(
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
    addresses: EngineZmqAddresses,
    num_api_servers: int = 1,
) -> Iterator[
    tuple[
        CoreEngineProcManager | CoreEngineActorManager | None,
        DPCoordinator | None,
        EngineZmqAddresses,
        Queue | None,
    ]
]:
    """Launch engine and DP coordinator processes as needed."""
    ...
    with zmq_socket_ctx(
        local_handshake_address, zmq.ROUTER, bind=True
    ) as handshake_socket:
        # Start local engines.
        if local_engine_count:
            local_engine_manager = CoreEngineProcManager(
                vllm_config=vllm_config,
                executor_class=executor_class,
                log_stats=log_stats,
                handshake_address=handshake_address,
                client_handshake_address=client_handshake_address,
                local_client=True,
                local_engine_count=local_engine_count,
                start_index=dp_rank,
                local_start_index=local_start_index or 0,
                tensor_queue=tensor_queue,
            )
        else:
            local_engine_manager = None

        yield local_engine_manager, coordinator, addresses, tensor_queue

        # Now wait for engines to start.
        wait_for_engine_startup(
            handshake_socket,
            addresses,
            engines_to_handshake,
            parallel_config,
            dp_size > 1 and vllm_config.model_config.is_moe,
            vllm_config.cache_config,
            local_engine_manager,
            coordinator.proc if coordinator else None,
        )

Core 进程启动后,会通过 startup_handshake() 向前端注册。前端返回 EngineHandshakeMetadata,其中包含 input/output socket 地址。

@staticmethod
def startup_handshake(
    handshake_socket: zmq.Socket,
    local_client: bool,
    headless: bool,
    parallel_config: ParallelConfig | None = None,
) -> EngineZmqAddresses:
    # Send registration message.
    handshake_socket.send(
        msgspec.msgpack.encode(
            {
                "status": "HELLO",
                "local": local_client,
                "headless": headless,
            }
        )
    )

    # Receive initialization message.
    logger.debug("Waiting for init message from front-end.")
    if not handshake_socket.poll(timeout=HANDSHAKE_TIMEOUT_MINS * 60_000):
        raise RuntimeError(...)
    init_bytes = handshake_socket.recv()
    init_message: EngineHandshakeMetadata = msgspec.msgpack.decode(
        init_bytes, type=EngineHandshakeMetadata
    )
    ...
    return init_message.addresses

等 Core 内部初始化完成,它还会发 READY。随后 Core 的 input socket 线程会连接 Client 的 input ROUTER,并发送一个空帧,让 ROUTER 知道这个 engine identity 已经可路由。

def process_input_sockets(
    self,
    input_addresses: list[str],
    coord_input_address: str | None,
    identity: bytes,
    ready_event: threading.Event,
):
    """Input socket IO thread."""
    ...
    with ExitStack() as stack, zmq.Context() as ctx:
        input_sockets = [
            stack.enter_context(
                make_zmq_socket(
                    ctx, input_address, zmq.DEALER, identity=identity, bind=False
                )
            )
            for input_address in input_addresses
        ]
        ...
        for input_socket in input_sockets:
            # Send initial message to each input socket - this is required
            # before the front-end ROUTER socket can send input messages
            # back to us.
            input_socket.send(b"")
            poller.register(input_socket, zmq.POLLIN)
        ...

1.7 SyncMPClient:后台输出线程

SyncMPClient.__init__()MPClient 完成 socket、Core 启动、ready 等初始化后,额外启动一个输出线程。这个线程负责:

  • poll Core output PULL socket。
  • 解码 EngineCoreOutputs
  • 如果是 utility output,就完成对应 Future。
  • 否则放入同步 queue.Queue,供 get_output() 阻塞读取。
def __init__(
    self, vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool
):
    super().__init__(
        asyncio_mode=False,
        vllm_config=vllm_config,
        executor_class=executor_class,
        log_stats=log_stats,
    )

    self.is_dp = self.vllm_config.parallel_config.data_parallel_size > 1
    self.outputs_queue = queue.Queue[EngineCoreOutputs | Exception]()
    ...
    def process_outputs_socket():
        assert isinstance(out_socket, zmq.Socket)
        shutdown_socket = ctx.socket(zmq.PAIR)
        try:
            shutdown_socket.bind(shutdown_path)
            poller = zmq.Poller()
            poller.register(shutdown_socket, zmq.POLLIN)
            poller.register(out_socket, zmq.POLLIN)
            while True:
                socks = poller.poll()
                if not socks:
                    continue
                if len(socks) == 2 or socks[0][0] == shutdown_socket:
                    # shutdown signal, exit thread.
                    break

                frames = out_socket.recv_multipart(copy=False)
                resources.validate_alive(frames)
                outputs: EngineCoreOutputs = decoder.decode(frames)
                if outputs.utility_output:
                    _process_utility_output(outputs.utility_output, utility_results)
                else:
                    outputs_queue.put_nowait(outputs)
        except Exception as e:
            outputs_queue.put_nowait(e)
        finally:
            # Close sockets.
            shutdown_socket.close(linger=0)
            out_socket.close(linger=0)

    # Process outputs from engine in separate thread.
    self.output_queue_thread = Thread(
        target=process_outputs_socket,
        name="EngineCoreOutputQueueThread",
        daemon=True,
    )
    self.output_queue_thread.start()
    ...

get_output() 本身就很薄:从 outputs_queue 取一批 Core 输出。

def get_output(self) -> EngineCoreOutputs:
    # If an exception arises in process_outputs_socket task,
    # it is forwarded to the outputs_queue so we can raise it
    # from this (run_output_handler) task to shut down the server.
    outputs = self.outputs_queue.get()

    if isinstance(outputs, Exception):
        raise self._format_exception(outputs) from None
    if outputs.wave_complete is not None:
        self.engines_running = False
    return outputs

二、请求如何从 LLM 添加到 Core

2.1 LLM.generate():同步离线推理的外层入口

LLM.generate() 做 runner 类型检查、填默认 SamplingParams,然后进入 _run_completion()

def generate(
    self,
    prompts: PromptType | Sequence[PromptType],
    sampling_params: SamplingParams | Sequence[SamplingParams] | None = None,
    *,
    use_tqdm: bool | Callable[..., tqdm] = True,
    lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
    priority: list[int] | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
) -> list[RequestOutput]:
    """Generates the completions for the input prompts."""
    runner_type = self.model_config.runner_type
    if runner_type != "generate":
        raise ValueError(...)

    if sampling_params is None:
        sampling_params = self.get_default_sampling_params()

    return self._run_completion(
        prompts=prompts,
        params=sampling_params,
        output_type=RequestOutput,
        use_tqdm=use_tqdm,
        lora_request=lora_request,
        tokenization_kwargs=tokenization_kwargs,
        priority=priority,
    )

_run_completion() 分两步:

  1. _add_completion_requests():渲染 prompt 并 add request。
  2. _run_engine():同步 step,直到全部完成。
def _run_completion(
    self,
    prompts: PromptType | Sequence[PromptType],
    params: SamplingParams
    | PoolingParams
    | Sequence[SamplingParams | PoolingParams],
    output_type: type[_O],
    *,
    use_tqdm: bool | Callable[..., tqdm] = True,
    lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
    priority: list[int] | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
):
    self._add_completion_requests(
        prompts=prompts,
        params=params,
        use_tqdm=use_tqdm,
        lora_request=lora_request,
        priority=priority,
        tokenization_kwargs=tokenization_kwargs,
    )
    return self._run_engine(use_tqdm=use_tqdm, output_type=output_type)

2.2 LLM 先把 prompt 渲染成 EngineInput

_add_completion_requests() 把单条或多条 prompt/params/lora/priority 都规整成序列,然后用 generator 一条条调用 _preprocess_cmpl_one()。这一步会经过 renderer,把用户输入变成 EngineInput

def _add_completion_requests(
    self,
    prompts: PromptType | Sequence[PromptType],
    params: SamplingParams
    | PoolingParams
    | Sequence[SamplingParams | PoolingParams],
    *,
    use_tqdm: bool | Callable[..., tqdm] = True,
    lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
    priority: list[int] | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
) -> list[str]:
    seq_prompts = prompt_to_seq(prompts)
    seq_params = self._params_to_seq(params, len(seq_prompts))
    seq_lora_requests = self._lora_request_to_seq(lora_request, len(seq_prompts))
    seq_priority = self._priority_to_seq(priority, len(prompts))

    return self._render_and_add_requests(
        prompts=(
            self._preprocess_cmpl_one(prompt, tokenization_kwargs)
            for prompt in maybe_tqdm(
                seq_prompts,
                use_tqdm=use_tqdm,
                desc="Rendering prompts",
            )
        ),
        params=seq_params,
        lora_requests=seq_lora_requests,
        priorities=seq_priority,
    )

实际渲染入口如下:

def _preprocess_cmpl(
    self,
    prompts: Sequence[PromptType],
    tokenization_kwargs: dict[str, Any] | None = None,
) -> Sequence[EngineInput]:
    """
    Convert prompt inputs from LLM APIs (other than [LLM.chat][]) into
    a format that can be passed to `_add_request`.
    """
    renderer = self.renderer
    model_config = self.model_config

    parsed_prompts = [
        parse_model_prompt(model_config, prompt) for prompt in prompts
    ]
    tok_params = renderer.default_cmpl_tok_params.with_kwargs(
        **(tokenization_kwargs or {})
    )

    return renderer.render_cmpl(parsed_prompts, tok_params)

2.3 LLM._add_request():设置 FINAL_ONLY,分配外部 ID

离线 LLM.generate() 只需要最终结果,所以在 _add_request() 里会把 SamplingParams.output_kind 改为 FINAL_ONLY。随后用 Counter() 生成用户可见的外部 request id,例如 "0""1"

def _add_request(
    self,
    prompt: EngineInput,
    params: SamplingParams | PoolingParams,
    lora_request: LoRARequest | None = None,
    priority: int = 0,
) -> str:
    if isinstance(params, SamplingParams):
        # We only care about the final output
        params.output_kind = RequestOutputKind.FINAL_ONLY

    request_id = str(next(self.request_counter))

    return self.llm_engine.add_request(
        request_id,
        prompt,
        params,
        lora_request=lora_request,
        priority=priority,
    )

这一层的 request_id 是外部 ID。后面 InputProcessor.assign_request_id() 会把它保存到 external_req_id,并生成内部唯一 ID。

2.4 LLMEngine.add_request():Client 侧添加请求的中枢

LLMEngine.add_request() 是请求进入 frontend engine 的关键函数。它串起三件事:

  1. 通过 InputProcessor.process_inputs() 把输入变成 EngineCoreRequest
  2. 通过 OutputProcessor.add_request() 先在 Client 侧建好 RequestState
  3. 通过 engine_core.add_request() 把请求发送给 Core。
def add_request(
    self,
    request_id: str,
    prompt: EngineCoreRequest | PromptType | EngineInput,
    params: SamplingParams | PoolingParams,
    arrival_time: float | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    prompt_text: str | None = None,
) -> str:
    # Validate the request_id type.
    if not isinstance(request_id, str):
        raise TypeError(f"request_id must be a string, got {type(request_id)}")

    # Process raw inputs into the request.
    if isinstance(prompt, EngineCoreRequest):
        ...
    else:
        request = self.input_processor.process_inputs(
            request_id,
            prompt,
            params,
            supported_tasks=self.get_supported_tasks(),
            arrival_time=arrival_time,
            lora_request=lora_request,
            tokenization_kwargs=tokenization_kwargs,
            trace_headers=trace_headers,
            priority=priority,
        )
        prompt_text, _, _ = extract_prompt_components(self.model_config, prompt)

    self.input_processor.assign_request_id(request)

    req_id = request.request_id

    # Use cloned params that may have been updated in process_inputs()
    params = request.params

    n = params.n if isinstance(params, SamplingParams) else 1

    if n == 1:
        # Make a new RequestState and queue.
        self.output_processor.add_request(request, prompt_text, None, 0)
        # Add the request to EngineCore.
        self.engine_core.add_request(request)
        return req_id
    ...

SamplingParams.n > 1 时,LLMEngine 会把一个外部请求拆成多个 child request,每个 child request 都添加到 OutputProcessor 和 Core;最终由 ParentRequest 聚合回同一个外部输出。

def add_request(
    self,
    request_id: str,
    prompt: EngineCoreRequest | PromptType | EngineInput,
    params: SamplingParams | PoolingParams,
    ...
) -> str:
    ...
    # Fan out child requests (for n>1).
    parent_req = ParentRequest(request)
    for idx in range(n):
        request_id, child_params = parent_req.get_child_info(idx)
        child_request = request if idx == n - 1 else copy(request)
        child_request.request_id = request_id
        child_request.sampling_params = child_params

        # Make a new RequestState and queue.
        self.output_processor.add_request(
            child_request, prompt_text, parent_req, idx
        )
        # Add the request to EngineCore.
        self.engine_core.add_request(child_request)

    return req_id

2.5 InputProcessor.process_inputs():把前端输入变成 Core 请求

InputProcessor 的产物是 EngineCoreRequest。它不负责发送,只负责把输入规整到 Core 能理解的结构。

主要处理包括:

  • 校验 SamplingParams / PoolingParams 和 LoRA。
  • 如果已经是 EngineInput,直接使用;否则走旧的 raw prompt preprocess。
  • 平台级请求校验。
  • 拆分 encoder/decoder 输入。
  • 处理 token ids 或 prompt embeds。
  • 克隆并补全 sampling/pooling params。
  • 对多模态输入构造 MultiModalFeatureSpec
  • 返回 EngineCoreRequest
def process_inputs(
    self,
    request_id: str,
    prompt: PromptType | EngineInput,
    params: SamplingParams | PoolingParams,
    supported_tasks: tuple[SupportedTask, ...],
    arrival_time: float | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    resumable: bool = False,
) -> EngineCoreRequest:
    self._validate_params(params, supported_tasks)
    self._validate_lora(lora_request)
    ...
    if isinstance(prompt, dict) and "type" in prompt:
        ...
        if arrival_time is None:
            arrival_time = prompt.get("arrival_time", time.time())

        processed_inputs: EngineInput = prompt
    else:
        ...
        if arrival_time is None:
            arrival_time = time.time()

        processed_inputs = self.input_preprocessor.preprocess(
            prompt,
            tokenization_kwargs=tokenization_kwargs,
        )

    current_platform.validate_request(processed_inputs, params)

    encoder_inputs, decoder_inputs = split_enc_dec_input(processed_inputs)
    self._validate_model_inputs(encoder_inputs, decoder_inputs)

    # Mypy can be conservative for TypedDict unions; normalize access.
    if decoder_inputs["type"] == "embeds":
        prompt_token_ids = None
        prompt_embeds = decoder_inputs["prompt_embeds"]
    else:
        prompt_token_ids = decoder_inputs["prompt_token_ids"]
        prompt_embeds = None
    ...

采样参数会被 clone,并根据模型、generation config、tokenizer 进行补全。

def process_inputs(
    self,
    request_id: str,
    prompt: PromptType | EngineInput,
    params: SamplingParams | PoolingParams,
    supported_tasks: tuple[SupportedTask, ...],
    ...
) -> EngineCoreRequest:
    ...
    sampling_params = None
    pooling_params = None
    if isinstance(params, SamplingParams):
        # TODO: can we avoid cloning here in multiproc case?
        sampling_params = params.clone()
        # If unset max tokens, then generate up to the max_model_len.
        if sampling_params.max_tokens is None:
            seq_len = length_from_prompt_token_ids_or_embeds(
                prompt_token_ids, prompt_embeds
            )
            sampling_params.max_tokens = self.model_config.max_model_len - seq_len

        sampling_params.update_from_generation_config(
            self.generation_config_fields,
            self.renderer.get_eos_token_id(),
        )
        if self.tokenizer is not None:
            sampling_params.update_from_tokenizer(self.tokenizer)
    else:
        pooling_params = params.clone()
    ...
    return EngineCoreRequest(
        request_id=request_id,
        prompt_token_ids=prompt_token_ids,
        prompt_embeds=prompt_embeds,
        mm_features=mm_features,
        sampling_params=sampling_params,
        pooling_params=pooling_params,
        arrival_time=arrival_time,
        lora_request=lora_request,
        cache_salt=decoder_inputs.get("cache_salt"),
        priority=priority,
        data_parallel_rank=data_parallel_rank,
        trace_headers=trace_headers,
        resumable=resumable,
    )

EngineCoreRequest 本身是 msgspec 结构,后续会直接被 SyncMPClient 序列化发送。

class EngineCoreRequest(
    msgspec.Struct,
    array_like=True,
    omit_defaults=True,
    gc=False,
):
    request_id: str
    prompt_token_ids: list[int] | None
    mm_features: list[MultiModalFeatureSpec] | None
    sampling_params: SamplingParams | None
    pooling_params: PoolingParams | None
    arrival_time: float
    lora_request: LoRARequest | None
    cache_salt: str | None
    data_parallel_rank: int | None
    prompt_embeds: torch.Tensor | None = None
    ...
    external_req_id: str | None = None
    reasoning_ended: bool | None = None

2.6 assign_request_id():外部 ID 与内部 ID 分离

Client 侧维护两种 ID:

  • external_req_id:用户/LLM 层看到的 ID,例如 "0"
  • request_id:内部唯一 ID,例如 "0-8f3a1c2d",用于 Core、OutputProcessor、abort 等内部映射。
@staticmethod
def assign_request_id(request: EngineCoreRequest):
    """Replace the externally supplied request ID with an internal request ID
    that adds 8 random characters in order to ensure uniqueness.
    """
    if request.external_req_id is not None:
        raise ValueError(
            "The external_req_id field should not be set on EngineCoreRequests"
            " passed to vLLM; use the request_id field."
        )
    request.external_req_id = request.request_id
    if envs.VLLM_DISABLE_REQUEST_ID_RANDOMIZATION:
        logger.warning_once(...)
    else:
        request.request_id = f"{request.external_req_id}-{random_uuid():.8}"

这个设计的直接结果是:

  • OutputProcessor.request_states 用内部 request_id 做 key。
  • 最终 RequestOutput.request_id 使用 external_req_id
  • 外部重复 ID 不会直接污染内部 Core 请求 ID。

2.7 OutputProcessor.add_request():先在 Client 侧登记请求状态

请求发送给 Core 之前,LLMEngine 会先让 OutputProcessor 创建 RequestState。这是因为后续 Core 只会返回 token ids、finish reason、logprobs 等增量信息;Client 必须提前保存 prompt、detokenizer、logprobs processor、输出模式、父请求关系等上下文。

def add_request(
    self,
    request: EngineCoreRequest,
    prompt: str | None,
    parent_req: ParentRequest | None = None,
    request_index: int = 0,
    queue: RequestOutputCollector | None = None,
) -> None:
    request_id = request.request_id
    req_state = self.request_states.get(request_id)
    if req_state is not None:
        self._update_streaming_request_state(req_state, request, prompt)
        return

    req_state = RequestState.from_new_request(
        tokenizer=self.tokenizer,
        request=request,
        prompt=prompt,
        parent_req=parent_req,
        request_index=request_index,
        queue=queue,
        log_stats=self.log_stats,
        stream_interval=self.stream_interval,
    )
    self.request_states[request_id] = req_state
    if parent_req:
        self.parent_requests[parent_req.request_id] = parent_req

    # Track the external_req_id -> [internal_req_id, ...] mapping
    self.external_req_ids[req_state.external_req_id].append(request_id)

RequestState.from_new_request() 根据请求类型创建 detokenizer/logprobs processor。对于普通 generation,它会从 SamplingParams 得到 output_kindmax_tokenstop_pn 等信息。

@classmethod
def from_new_request(
    cls,
    tokenizer: TokenizerLike | None,
    request: EngineCoreRequest,
    prompt: str | None,
    parent_req: ParentRequest | None,
    request_index: int,
    queue: RequestOutputCollector | None,
    log_stats: bool,
    stream_interval: int,
) -> "RequestState":
    if sampling_params := request.sampling_params:
        if not sampling_params.detokenize:
            tokenizer = None
        output_kind = sampling_params.output_kind
        logprobs_processor = LogprobsProcessor.from_new_request(
            tokenizer=tokenizer,
            request=request,
        )
        detokenizer = IncrementalDetokenizer.from_new_request(
            tokenizer=tokenizer,
            request=request,
        )
        max_tokens_param = sampling_params.max_tokens
        top_p = sampling_params.top_p
        n = sampling_params.n
        temperature = sampling_params.temperature
    else:
        logprobs_processor = None
        detokenizer = None
        max_tokens_param = None
        top_p = None
        n = None
        temperature = None
        assert request.pooling_params is not None
        output_kind = request.pooling_params.output_kind

    assert request.external_req_id is not None
    return cls(
        request_id=request.request_id,
        external_req_id=request.external_req_id,
        parent_req=parent_req,
        request_index=request_index,
        lora_request=request.lora_request,
        output_kind=output_kind,
        prompt=prompt,
        prompt_token_ids=request.prompt_token_ids,
        prompt_embeds=request.prompt_embeds,
        logprobs_processor=logprobs_processor,
        detokenizer=detokenizer,
        max_tokens_param=max_tokens_param,
        ...
    )

2.8 SyncMPClient.add_request():通过 input ROUTER 发送 ADD

LLMEngine 调用 self.engine_core.add_request(request) 时,在默认多进程同步路径下进入 SyncMPClient.add_request()

def add_request(self, request: EngineCoreRequest) -> None:
    if self.is_dp:
        self.engines_running = True
    self._send_input(EngineCoreRequestType.ADD, request)

_send_input() 会把消息拼成 ZMQ multipart:

[core_engine_identity, request_type.value, *msgpack_frames]

其中 request_type.value 对 ADD 来说是 b"\x00"

def _send_input(self, request_type: EngineCoreRequestType, request: Any):
    self.ensure_alive()
    self.free_pending_messages()
    # (Identity, RequestType, SerializedRequest)
    msg = (self.core_engine, request_type.value, *self.encoder.encode(request))

    if len(msg) <= 3:
        # No auxiliary buffers => no tensor backing buffers in request.
        self.input_socket.send_multipart(msg, copy=False)
        return

    tracker = self.input_socket.send_multipart(msg, copy=False, track=True)
    self.add_pending_message(tracker, request)

Core 侧的 input socket 线程只作为传输入口:它从 DEALER socket 读出 request type 和数据帧,反序列化 EngineCoreRequest,做 Core 入口预处理,然后放入 Core 的 input_queue。到这里,请求已经进入 Core 黑盒。

def process_input_sockets(
    self,
    input_addresses: list[str],
    coord_input_address: str | None,
    identity: bytes,
    ready_event: threading.Event,
):
    """Input socket IO thread."""
    ...
    while True:
        for input_socket, _ in poller.poll():
            # (RequestType, RequestData)
            type_frame, *data_frames = input_socket.recv_multipart(copy=False)
            ...
            request_type = EngineCoreRequestType(bytes(type_frame.buffer))

            # Deserialize the request data.
            request: Any
            if request_type == EngineCoreRequestType.ADD:
                req: EngineCoreRequest = add_request_decoder.decode(data_frames)
                try:
                    request = self.preprocess_add_request(req)
                except Exception:
                    self._handle_request_preproc_error(req)
                    continue
            else:
                request = generic_decoder.decode(data_frames)
                ...

            # Push to input queue for core busy loop.
            self.input_queue.put_nowait((request_type, request))

三、如何从 Core 获取输出并返回最终推理结果

3.1 Core 返回给 Client 的输出契约

从 Client 角度看,Core 返回的是 EngineCoreOutputs,其中最重要的是 outputs: list[EngineCoreOutput]。每个 EngineCoreOutput 对应某个内部 request_id 的一次增量输出。

class EngineCoreOutput(
    msgspec.Struct,
    array_like=True,
    omit_defaults=True,
    gc=False,
):
    request_id: str
    new_token_ids: list[int]

    new_logprobs: LogprobsLists | None = None
    new_prompt_logprobs_tensors: LogprobsTensors | None = None

    pooling_output: torch.Tensor | None = None

    finish_reason: FinishReason | None = None
    stop_reason: int | str | None = None
    events: list[EngineCoreEvent] | None = None
    kv_transfer_params: dict[str, Any] | None = None
    ...

    @property
    def finished(self) -> bool:
        return self.finish_reason is not None

EngineCoreOutputs 是一批输出,还可能携带 scheduler stats、utility result、DP wave 信号等。

class EngineCoreOutputs(
    msgspec.Struct,
    array_like=True,
    omit_defaults=True,
    gc=False,
):
    engine_index: int = 0

    # [num_reqs]
    outputs: list[EngineCoreOutput] = []
    scheduler_stats: SchedulerStats | None = None
    timestamp: float = 0.0

    utility_output: UtilityOutput | None = None
    finished_requests: set[str] | None = None
    ...

3.2 Core wrapper 如何把输出推回 Client

Core 内部 step 细节不展开。只看边界:EngineCoreProc._process_engine_step() 会拿到一批 EngineCoreOutputs,放入 output_queue

def _process_engine_step(self) -> bool:
    outputs, model_executed = self.step_fn()
    # Put EngineCoreOutputs into the output queue.
    for output in outputs.items() if outputs else ():
        self.output_queue.put_nowait(output)
    # Post-step hook.
    self.post_step(model_executed)
    ...
    return model_executed

Core 的 output socket 线程再从 output_queue 取出输出,经 msgpack 编码后通过 PUSH socket 发给 Client 的 PULL socket。

def process_output_sockets(
    self, output_paths: list[str], coord_output_path: str | None, engine_index: int
):
    """Output socket IO thread."""

    # Msgpack serialization encoding.
    encoder = MsgpackEncoder()
    ...
    with ExitStack() as stack, zmq.Context() as ctx:
        sockets = [
            stack.enter_context(
                make_zmq_socket(ctx, output_path, zmq.PUSH, linger=4000)
            )
            for output_path in output_paths
        ]
        ...
        while True:
            output = self.output_queue.get()
            if output == EngineCoreProc.ENGINE_CORE_DEAD:
                for socket in sockets:
                    socket.send(output)
                break
            assert not isinstance(output, bytes)
            client_index, outputs = output
            outputs.engine_index = engine_index
            ...
            buffers = encoder.encode_into(outputs, buffer)
            tracker = sockets[client_index].send_multipart(
                buffers, copy=False, track=True
            )
            ...

3.3 SyncMPClient 输出线程:把 ZMQ 帧变成本地 Queue

前面构造部分已经看到,SyncMPClient 有一个 EngineCoreOutputQueueThread。正常输出会进入 self.outputs_queue;utility output 则用于唤醒 call_utility() 等同步调用。

同步 LLMEngine.step() 不直接读 ZMQ socket,只调用 SyncMPClient.get_output() 从本地 queue 阻塞取数据。

def get_output(self) -> EngineCoreOutputs:
    # If an exception arises in process_outputs_socket task,
    # it is forwarded to the outputs_queue so we can raise it
    # from this (run_output_handler) task to shut down the server.
    outputs = self.outputs_queue.get()

    if isinstance(outputs, Exception):
        raise self._format_exception(outputs) from None
    if outputs.wave_complete is not None:
        self.engines_running = False
    return outputs

3.4 LLMEngine.step():一次同步 step 的外壳

LLMEngine.step() 完成一次“从 Core 取一批输出并处理”的动作:

  1. self.engine_core.get_output() 获取 EngineCoreOutputs
  2. self.output_processor.process_outputs() 生成 RequestOutput
  3. 如果 detokenizer 在 Client 侧发现 stop string,但 Core 尚未停止该请求,则补发 abort。
  4. 记录统计信息。
def step(self) -> list[RequestOutput | PoolingRequestOutput]:
    if self.should_execute_dummy_batch:
        self.should_execute_dummy_batch = False
        self.engine_core.execute_dummy_batch()
        return []

    # 1) Get EngineCoreOutput from the EngineCore.
    with record_function_or_nullcontext("llm_engine step: get_output"):
        outputs = self.engine_core.get_output()

    # 2) Process EngineCoreOutputs.
    with record_function_or_nullcontext("llm_engine step: process_outputs"):
        iteration_stats = IterationStats() if self.log_stats else None
        processed_outputs = self.output_processor.process_outputs(
            outputs.outputs,
            engine_core_timestamp=outputs.timestamp,
            iteration_stats=iteration_stats,
        )
        self.output_processor.update_scheduler_stats(outputs.scheduler_stats)

    # 3) Abort any reqs that finished due to stop strings.
    with record_function_or_nullcontext("llm_engine step: abort_requests"):
        self.engine_core.abort_requests(processed_outputs.reqs_to_abort)

    # 4) Record stats
    ...
    return processed_outputs.request_outputs

这里的 outputs.outputs 是真正给 OutputProcessor 处理的 list[EngineCoreOutput]

3.5 OutputProcessor.process_outputs():唯一批量遍历 Core outputs 的地方

OutputProcessor.process_outputs() 是 Client 侧输出处理的核心。它以内部 request_id 找到对应 RequestState,再逐个输出更新 detokenizer、logprobs、stats,最后创建外部 RequestOutput

def process_outputs(
    self,
    engine_core_outputs: list[EngineCoreOutput],
    engine_core_timestamp: float | None = None,
    iteration_stats: IterationStats | None = None,
) -> OutputProcessorOutput:
    """
    Process the EngineCoreOutputs:
    1) Compute stats for logging
    2) Detokenize
    3) Create and handle RequestOutput objects
    """

    request_outputs: list[RequestOutput | PoolingRequestOutput] = []
    reqs_to_abort: list[str] = []
    for engine_core_output in engine_core_outputs:
        req_id = engine_core_output.request_id
        req_state = self.request_states.get(req_id)
        if req_state is None:
            # Ignore output for already-aborted request.
            continue

        # 1) Compute stats for this iteration.
        self._update_stats_from_output(
            req_state, engine_core_output, engine_core_timestamp, iteration_stats
        )

        new_token_ids = engine_core_output.new_token_ids
        pooling_output = engine_core_output.pooling_output
        finish_reason = engine_core_output.finish_reason
        stop_reason = engine_core_output.stop_reason
        kv_transfer_params = engine_core_output.kv_transfer_params
        routed_experts = engine_core_output.routed_experts
        req_state.num_cached_tokens = engine_core_output.num_cached_tokens
        req_state.is_prefilling = False
        ...

对于 generation 输出,先用 IncrementalDetokenizer 处理新增 token。若 Client 侧识别到 stop string,会把 finish_reason 改成 STOP,并在后面通知 LLMEngine abort Core 侧请求。

def process_outputs(
    self,
    engine_core_outputs: list[EngineCoreOutput],
    engine_core_timestamp: float | None = None,
    iteration_stats: IterationStats | None = None,
) -> OutputProcessorOutput:
    ...
    for engine_core_output in engine_core_outputs:
        ...
        if pooling_output is None:
            assert req_state.detokenizer is not None
            assert req_state.logprobs_processor is not None
            # 2) Detokenize the token ids into text and perform stop checks.
            stop_string = req_state.detokenizer.update(
                new_token_ids, finish_reason == FinishReason.STOP
            )
            if stop_string:
                finish_reason = FinishReason.STOP
                stop_reason = stop_string

            # 3) Compute sample and prompt logprobs for request,
            # if required.
            req_state.logprobs_processor.update_from_output(engine_core_output)

        # 4) Create and handle RequestOutput objects.
        if request_output := req_state.make_request_output(
            new_token_ids,
            pooling_output,
            finish_reason,
            stop_reason,
            kv_transfer_params,
            routed_experts,
        ):
            if req_state.streaming_input:
                request_output.finished = False

            if req_state.queue is not None:
                # AsyncLLM: put into queue for handling by generate().
                req_state.queue.put(request_output)
            else:
                # LLMEngine: return list of RequestOutputs.
                request_outputs.append(request_output)
        ...

请求完成时,OutputProcessor 会清理内部状态;如果是 stop string 导致 Client 侧完成而 Core 输出本身未标 finished,则把内部 ID 放入 reqs_to_abort

def process_outputs(
    self,
    engine_core_outputs: list[EngineCoreOutput],
    engine_core_timestamp: float | None = None,
    iteration_stats: IterationStats | None = None,
) -> OutputProcessorOutput:
    ...
    for engine_core_output in engine_core_outputs:
        ...
        # Free completed requests.
        if finish_reason is not None:
            if req_state.streaming_input:
                ...
            else:
                self._finish_request(req_state)
                if not engine_core_output.finished:
                    # If req not finished in EngineCore, but Detokenizer
                    # detected stop string, abort needed in EngineCore.
                    reqs_to_abort.append(req_id)

                # Track per-request stats
                self._update_stats_from_finished(
                    req_state, finish_reason, iteration_stats
                )
                if self.tracing_enabled:
                    self.do_tracing(engine_core_output, req_state, iteration_stats)

    return OutputProcessorOutput(
        request_outputs=request_outputs,
        reqs_to_abort=reqs_to_abort,
    )

清理动作会移除内部 request_id,并维护 external_req_id -> internal_req_id 的映射。

def _finish_request(self, req_state: RequestState) -> None:
    req_id = req_state.request_id
    self.request_states.pop(req_id)

    internal_ids = self.external_req_ids[req_state.external_req_id]
    internal_ids.remove(req_id)
    if not internal_ids:
        del self.external_req_ids[req_state.external_req_id]

    # Remove parent request if applicable.
    parent_req = req_state.parent_req
    if parent_req and not parent_req.child_requests:
        self.parent_requests.pop(parent_req.request_id, None)

3.6 RequestState.make_request_output():内部状态转外部输出

RequestState 保存了生成 RequestOutput 所需的所有 Client 侧上下文:

  • 外部/内部 request id。
  • prompt 文本和 prompt token ids。
  • detokenizer 累积文本。
  • logprobs processor。
  • output_kind
  • parallel sampling 的 parent request。
  • stats、LoRA、stream interval 等。

它的 make_request_output() 会先根据 output_kind 判断是否该返回输出。离线 LLM.generate() 已经把 output_kind 设置为 FINAL_ONLY,因此非 finished 的中间 token 通常不会形成 RequestOutput

def make_request_output(
    self,
    new_token_ids: list[int],
    pooling_output: torch.Tensor | None,
    finish_reason: FinishReason | None,
    stop_reason: int | str | None,
    kv_transfer_params: dict[str, Any] | None = None,
    routed_experts: np.ndarray | None = None,
) -> RequestOutput | PoolingRequestOutput | None:
    finished = finish_reason is not None
    final_only = self.output_kind == RequestOutputKind.FINAL_ONLY

    if not finished and final_only:
        # Only the final output is required in FINAL_ONLY mode.
        return None
    ...

generation 输出会走 _new_completion_output(),再组装为 RequestOutput。注意这里返回给用户的是 external_req_id

def make_request_output(
    self,
    new_token_ids: list[int],
    pooling_output: torch.Tensor | None,
    finish_reason: FinishReason | None,
    stop_reason: int | str | None,
    kv_transfer_params: dict[str, Any] | None = None,
    routed_experts: np.ndarray | None = None,
) -> RequestOutput | PoolingRequestOutput | None:
    ...
    external_req_id = self.external_req_id

    if pooling_output is not None:
        return self._new_request_output(
            external_req_id,
            [self._new_pooling_output(pooling_output)],
            finished,
        )

    output = self._new_completion_output(
        new_token_ids, finish_reason, stop_reason, routed_experts
    )

    if self.parent_req is None:
        outputs = [output]
    else:
        outputs, finished = self.parent_req.get_outputs(self.request_id, output)
        if not outputs:
            return None
        external_req_id = self.parent_req.external_req_id

    return self._new_request_output(
        external_req_id, outputs, finished, kv_transfer_params
    )

_new_request_output() 最终创建外部 API 看到的 RequestOutput

def _new_request_output(
    self,
    external_req_id: str,
    outputs: list[CompletionOutput] | list[PoolingOutput],
    finished: bool,
    kv_transfer_params: dict[str, Any] | None = None,
) -> RequestOutput | PoolingRequestOutput:
    ...
    return RequestOutput(
        request_id=external_req_id,  # request_id is what was provided externally
        lora_request=self.lora_request,
        prompt=self.prompt,
        prompt_token_ids=prompt_token_ids,
        prompt_logprobs=prompt_logprobs,
        outputs=cast(list[CompletionOutput], outputs),
        finished=finished,
        kv_transfer_params=kv_transfer_params,
        num_cached_tokens=self.num_cached_tokens,
        metrics=self.stats,
    )

而每条 completion 的文本、token ids、finish reason 来自 detokenizer 和当前 Core 输出。

def _new_completion_output(
    self,
    token_ids: list[int],
    finish_reason: FinishReason | None,
    stop_reason: int | str | None,
    routed_experts: np.ndarray | None = None,
) -> CompletionOutput:
    assert self.detokenizer is not None
    assert self.logprobs_processor is not None
    finished = finish_reason is not None
    delta = self.output_kind == RequestOutputKind.DELTA

    # Prepare text and token_ids, based on delta mode
    text = self.detokenizer.get_next_output_text(finished, delta)
    if not delta:
        token_ids = self.detokenizer.output_token_ids

    # Prepare logprobs, based on delta mode
    logprobs = self.logprobs_processor.logprobs
    if delta and logprobs:
        logprobs = logprobs[-len(token_ids) :]

    return CompletionOutput(
        index=self.request_index,
        text=text,
        token_ids=token_ids,
        routed_experts=routed_experts,
        logprobs=logprobs,
        cumulative_logprob=self.logprobs_processor.cumulative_logprob,
        finish_reason=str(finish_reason) if finished else None,
        stop_reason=stop_reason if finished else None,
    )

3.7 LLM._run_engine():同步拉取直到所有请求完成

LLM._run_engine() 是离线同步 API 的最后一环。它一直调用 LLMEngine.step(),直到 LLMEngine.has_unfinished_requests() 为 false。

def _run_engine(
    self,
    output_type: type[_O] | tuple[type[_O], ...],
    *,
    use_tqdm: bool | Callable[..., tqdm] = True,
) -> list[_O]:
    # Initialize tqdm.
    if use_tqdm:
        num_requests = self.llm_engine.get_num_unfinished_requests()
        ...

    # Run the engine.
    outputs: list[_O] = []
    total_in_toks = 0
    total_out_toks = 0
    while self.llm_engine.has_unfinished_requests():
        step_outputs = self.llm_engine.step()
        for output in step_outputs:
            assert isinstance(output, output_type)
            if output.finished:
                outputs.append(output)
                if use_tqdm:
                    ...

    if use_tqdm:
        pbar.close()
    # Sort the outputs by request ID.
    # This is necessary because some requests may be finished earlier than
    # its previous requests.
    return sorted(outputs, key=lambda x: int(x.request_id))

由于 LLM.generate() 使用 FINAL_ONLY,这里收集到的通常都是 final RequestOutput。如果多个请求并发执行,完成顺序可能和提交顺序不同,所以最后按外部 request id 排序。

四、一个小例子串起完整流程

示例代码:

from vllm import LLM, SamplingParams

llm = LLM(model="facebook/opt-125m")
params = SamplingParams(max_tokens=3)
outputs = llm.generate(["Hello"], params, use_tqdm=False)

print(outputs[0].request_id)
print(outputs[0].outputs[0].text)

这段代码在 Client 侧大致会经历下面的状态变化。

4.1 构造阶段

LLM(...)
  -> EngineArgs(...)
  -> LLMEngine.from_engine_args(...)
       -> VllmConfig
       -> Executor class
       -> LLMEngine(...)
            -> renderer
            -> InputProcessor
            -> OutputProcessor
            -> EngineCoreClient.make_client(...)
                 -> SyncMPClient(...)
                      -> bind input ROUTER
                      -> bind output PULL
                      -> launch EngineCoreProc
                      -> handshake HELLO / READY
                      -> start output queue thread

构造完成后,LLM 还会通过 get_supported_tasks() 做一次 utility RPC,确认模型支持 generation。

4.2 添加请求阶段

LLM.generate(["Hello"], SamplingParams(max_tokens=3))
  -> _run_completion()
  -> _add_completion_requests()
  -> _preprocess_cmpl_one("Hello")
       -> renderer.render_cmpl(...)
       -> EngineInput(type=..., prompt_token_ids=[...])
  -> _add_request(...)
       external request_id = "0"
       params.output_kind = FINAL_ONLY
  -> LLMEngine.add_request("0", EngineInput, params)
       -> InputProcessor.process_inputs(...)
            EngineCoreRequest(request_id="0", prompt_token_ids=[...])
       -> InputProcessor.assign_request_id(...)
            external_req_id = "0"
            request_id = "0-<random8>"
       -> OutputProcessor.add_request(...)
            request_states["0-<random8>"] = RequestState(...)
       -> SyncMPClient.add_request(...)
            send [identity, ADD, msgpack(EngineCoreRequest)] to Core

此时 Client 侧已经有一个 RequestState,Core 侧也已经收到一个 ADD 请求。RequestState 里保存的是后续拼 RequestOutput 必需的上下文;Core 不需要知道最终 API 输出长什么样。

4.3 输出返回阶段

假设 Core 分几轮返回 token:

Core -> EngineCoreOutputs([
  EngineCoreOutput(
    request_id="0-<random8>",
    new_token_ids=[...],
    finish_reason=None,
  )
])

SyncMPClient 输出线程把它解码后放入 outputs_queueLLM._run_engine() 调用 LLMEngine.step(),后者通过 get_output() 取到这批输出,并交给 OutputProcessor.process_outputs()

因为该请求是 FINAL_ONLY,如果 finish_reason is None

RequestState.make_request_output(...)
  -> not finished and final_only
  -> return None

中间 token 会被 detokenizer 累积,但不会返回给 LLM._run_engine()

最后一轮 Core 返回:

Core -> EngineCoreOutputs([
  EngineCoreOutput(
    request_id="0-<random8>",
    new_token_ids=[...],
    finish_reason=STOP or LENGTH,
    stop_reason=...,
  )
])

这一次 RequestState.make_request_output() 会创建外部输出:

RequestOutput(
  request_id="0",
  prompt="Hello",
  prompt_token_ids=[...],
  outputs=[
    CompletionOutput(
      index=0,
      text="<detokenized final text>",
      token_ids=[all generated token ids],
      finish_reason="stop" or "length",
      ...
    )
  ],
  finished=True,
)

随后 OutputProcessor._finish_request() 删除 request_states["0-<random8>"]。当所有请求都被清理后,LLMEngine.has_unfinished_requests() 返回 false,LLM._run_engine() 停止循环,按外部 request id 排序并返回最终 RequestOutput 列表。

五、小结

整条 Client 侧链路可以浓缩为一句话:

LLM 负责用户 API 和同步驱动,
LLMEngine 负责把输入/输出在 API 形态与 Core 形态之间转换,
InputProcessor 把 EngineInput 做成 EngineCoreRequest,
OutputProcessor + RequestState 把 EngineCoreOutput 还原成 RequestOutput,
SyncMPClient 则负责用 ZMQ 把 EngineCoreRequest/EngineCoreOutputs 穿过进程边界。

在这个设计里,Client 侧和 Core 侧的边界非常清晰:

  • 请求下行只发送 EngineCoreRequest 和 request type。
  • 输出上行只接收 EngineCoreOutputs
  • prompt 文本、detokenizer、logprobs、外部 request id、parallel sampling 聚合等 API 语义都留在 Client 侧的 OutputProcessor/RequestState 中处理。
  • Core 内部可以专注调度和执行;Client 侧负责把黑盒执行结果整理成稳定的用户 API 输出。