vLLM框架代码走读01(Client侧)
本文聚焦 vLLM V1 离线推理入口 LLM 的 Client 侧链路,覆盖这些核心类:
LLMLLMEngineSyncMPClientInputProcessorOutputProcessorRequestState
这里的 RequestState 特指 vllm.v1.engine.output_processor.RequestState。Core 侧内部的调度、KV cache、模型执行细节都视为黑盒,只关心 Client 如何构造请求、如何通过 ZMQ 把请求交给 Core、又如何拿回 EngineCoreOutputs 并组装成最终 RequestOutput。
默认路径以 VLLM_ENABLE_V1_MULTIPROCESSING=1 为前提,也就是 LLMEngine 通过 SyncMPClient 连接后台 EngineCoreProc。如果关闭 V1 multiprocessing,EngineCoreClient.make_client() 会走 InprocClient,请求和输出都在同进程直接调用,不经过本文重点讨论的 ZMQ 传输层。
零、总览
0.1 一张时序图
@startuml
title vLLM Client-side offline inference flow
actor App
participant LLM
participant LLMEngine
participant InputProcessor
participant OutputProcessor
participant RequestState
participant SyncMPClient
participant "EngineCoreProc\n(Core black box)" as Core
== 构造 ==
App -> LLM: LLM(model, ...)
LLM -> LLM: build EngineArgs
LLM -> LLMEngine: from_engine_args(...)
LLMEngine -> LLMEngine: create VllmConfig / Executor class
LLMEngine -> InputProcessor: new(vllm_config, renderer)
LLMEngine -> OutputProcessor: new(tokenizer, log_stats, stream_interval)
LLMEngine -> SyncMPClient: EngineCoreClient.make_client(...)
SyncMPClient -> Core: launch process + startup handshake
Core -> SyncMPClient: DEALER connects to input ROUTER
Core -> SyncMPClient: PUSH connects to output PULL
LLM -> SyncMPClient: get_supported_tasks() as UTILITY RPC
SyncMPClient --> LLM: supported tasks
== 添加请求 ==
App -> LLM: generate(prompts, sampling_params)
LLM -> LLM: render prompt into EngineInput
LLM -> LLMEngine: add_request("0", EngineInput, params)
LLMEngine -> InputProcessor: process_inputs(...)
InputProcessor --> LLMEngine: EngineCoreRequest(request_id="0")
LLMEngine -> InputProcessor: assign_request_id(...)
note right
external_req_id = "0"
request_id = "0-<random8>"
end note
LLMEngine -> OutputProcessor: add_request(request, prompt_text)
OutputProcessor -> RequestState: from_new_request(...)
LLMEngine -> SyncMPClient: add_request(request)
SyncMPClient -> Core: ROUTER/DEALER send ADD + msgpack request
== 拉取输出 ==
loop until OutputProcessor has no unfinished requests
LLM -> LLMEngine: step()
LLMEngine -> SyncMPClient: get_output()
Core -> SyncMPClient: PUSH/PULL EngineCoreOutputs
SyncMPClient --> LLMEngine: EngineCoreOutputs
LLMEngine -> OutputProcessor: process_outputs(outputs.outputs)
OutputProcessor -> RequestState: detokenize / logprobs / make RequestOutput
OutputProcessor --> LLMEngine: RequestOutput list + reqs_to_abort
LLMEngine -> SyncMPClient: abort stop-string requests if needed
LLMEngine --> LLM: step outputs
end
LLM --> App: sorted final RequestOutput list
@enduml
0.2 Client/Core 边界
可以把 V1 离线推理拆成三层:
API facade
LLM
- 面向用户的 generate/chat/embed/classify 等方法
- 渲染 prompt,分配外部 request_id,驱动同步 step 循环
Frontend engine
LLMEngine
- 持有 InputProcessor / OutputProcessor / EngineCoreClient
- 把 EngineInput 变成 EngineCoreRequest
- 把 EngineCoreOutput 变成 RequestOutput
Transport to Core
SyncMPClient
- Client 进程里的同步 ZMQ client
- ADD/ABORT/UTILITY 请求走 input ROUTER -> Core DEALER
- EngineCoreOutputs 走 Core PUSH -> Client PULL
Core black box
EngineCoreProc
- 后台进程
- 接收 EngineCoreRequest,产出 EngineCoreOutputs
ZMQ 连接拓扑如下:
Client process EngineCoreProc process
---------------- ----------------------
input_socket: ROUTER bind(addr_in) <-----------> DEALER connect(addr_in)
send: [engine_identity, request_type, msgpack frames]
output_socket: PULL bind(addr_out) <------------- PUSH connect(addr_out)
recv: [msgpack EngineCoreOutputs frames]
handshake_socket: ROUTER bind(addr_hs) <----------> DEALER connect(addr_hs)
startup only: HELLO -> EngineHandshakeMetadata -> READY
一、构造流程:从 LLM 拉起整套 Client 系统
1.1 LLM:用户入口和 EngineArgs 构造者
LLM.__init__() 的职责是把用户传入的模型、tokenizer、并行、cache、编译、LoRA、采样相关配置汇总到 EngineArgs,再用这些参数创建 LLMEngine。LLM 本身不直接管理 Core,它持有的是 self.llm_engine。
def __init__(
self,
model: str,
*,
runner: RunnerOption = "auto",
convert: ConvertOption = "auto",
tokenizer: str | None = None,
...
) -> None:
"""LLM constructor."""
...
engine_args = EngineArgs(
model=model,
runner=runner,
convert=convert,
tokenizer=tokenizer,
tokenizer_mode=tokenizer_mode,
skip_tokenizer_init=skip_tokenizer_init,
trust_remote_code=trust_remote_code,
...
compilation_config=compilation_config_instance,
logits_processors=logits_processors,
**kwargs,
)
log_non_default_args(engine_args)
self.llm_engine = LLMEngine.from_engine_args(
engine_args=engine_args, usage_context=UsageContext.LLM_CLASS
)
self.model_config = self.llm_engine.model_config
self.engine_class = type(self.llm_engine)
self.request_counter = Counter()
self.default_sampling_params: dict[str, Any] | None = None
supported_tasks = self.llm_engine.get_supported_tasks()
self.supported_tasks = supported_tasks
self.pooling_task = self.model_config.get_pooling_task(supported_tasks)
...
self.renderer = self.llm_engine.renderer
self.chat_template = load_chat_template(chat_template)
self.io_processor = self.llm_engine.io_processor
self.input_processor = self.llm_engine.input_processor
...
这里有两个点很关键:
LLM创建完成前就会调用self.llm_engine.get_supported_tasks()。- 在
SyncMPClient路径下,这不是本地普通方法调用,而是一次UTILITYRPC:Client 通过 ZMQ 请求 Core 执行get_supported_tasks,再等待输出线程把结果放回 Future。
1.2 LLMEngine.from_engine_args():配置落地并选择 Executor
LLMEngine.from_engine_args() 先将 EngineArgs 转为完整的 VllmConfig,再根据配置选择 V1 Executor 类型。随后根据环境变量决定是否启用多进程模式。
@classmethod
def from_engine_args(
cls,
engine_args: EngineArgs,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
stat_loggers: list[StatLoggerFactory] | None = None,
enable_multiprocessing: bool = False,
) -> "LLMEngine":
"""Creates an LLM engine from the engine arguments."""
# Create the engine configs.
vllm_config = engine_args.create_engine_config(usage_context)
executor_class = Executor.get_class(vllm_config)
if envs.VLLM_ENABLE_V1_MULTIPROCESSING:
logger.debug("Enabling multiprocessing for LLMEngine.")
enable_multiprocessing = True
# Create the LLMEngine.
return cls(
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=not engine_args.disable_log_stats,
usage_context=usage_context,
stat_loggers=stat_loggers,
multiprocess_mode=enable_multiprocessing,
)
1.3 LLMEngine.__init__():装配 Input/Output Processor 和 Core Client
LLMEngine 是 Client 侧推理流程的中枢。它装配:
renderer:把高层输入渲染成 token/multimodal 形式。InputProcessor:把EngineInput变成EngineCoreRequest。OutputProcessor:把 Core 返回的EngineCoreOutputs变成外部 API 的RequestOutput。EngineCoreClient:根据模式选择SyncMPClient或InprocClient。
def __init__(
self,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
...
multiprocess_mode: bool = False,
) -> None:
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.observability_config = vllm_config.observability_config
...
self.renderer = renderer = renderer_from_config(self.vllm_config)
self.io_processor = get_io_processor(
self.vllm_config,
self.renderer,
self.model_config.io_processor_plugin,
)
# Convert EngineInput --> EngineCoreRequest.
self.input_processor = InputProcessor(self.vllm_config, renderer)
# Converts EngineCoreOutputs --> RequestOutput.
self.output_processor = OutputProcessor(
renderer.tokenizer,
log_stats=self.log_stats,
stream_interval=self.vllm_config.scheduler_config.stream_interval,
tracing_enabled=tracing_endpoint is not None,
)
# EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
self.engine_core = EngineCoreClient.make_client(
multiprocess_mode=multiprocess_mode,
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=self.log_stats,
)
...
self.reset_mm_cache()
1.4 EngineCoreClient.make_client():同步离线模式选择 SyncMPClient
LLMEngine 以 asyncio_mode=False 调用 make_client()。当 multiprocess_mode=True 时,返回的就是 SyncMPClient。
@staticmethod
def make_client(
multiprocess_mode: bool,
asyncio_mode: bool,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
) -> "EngineCoreClient":
# TODO: support this for debugging purposes.
if asyncio_mode and not multiprocess_mode:
raise NotImplementedError(
"Running EngineCore in asyncio without multiprocessing "
"is not currently supported."
)
if multiprocess_mode and asyncio_mode:
return EngineCoreClient.make_async_mp_client(
vllm_config, executor_class, log_stats
)
if multiprocess_mode and not asyncio_mode:
return SyncMPClient(vllm_config, executor_class, log_stats)
return InprocClient(vllm_config, executor_class, log_stats)
1.5 MPClient:创建 ZMQ socket、启动 Core、等待 ready
SyncMPClient 继承自 MPClient。大部分 ZMQ 和后台 Core 进程启动逻辑都在 MPClient.__init__()。
Client 侧先创建一个 ZMQ context,然后分两种情况:
- 传入
client_addresses:Core 由外部管理,Client 只绑定/连接既有地址。 - 未传入
client_addresses:Client 自己分配地址并通过launch_core_engines()启动后台EngineCoreProc。
默认离线 LLM 属于第二种。
def __init__(
self,
asyncio_mode: bool,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
client_addresses: dict[str, str] | None = None,
):
self.vllm_config = vllm_config
# ZMQ setup.
sync_ctx = zmq.Context(io_threads=2)
self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctx
...
if client_addresses:
# Engines are managed externally to this client.
input_address = client_addresses["input_address"]
output_address = client_addresses["output_address"]
...
self.input_socket = self.resources.input_socket = make_zmq_socket(
self.ctx,
input_address,
zmq.ROUTER,
bind=True,
router_handover=enable_input_socket_handover,
)
self.resources.output_socket = make_zmq_socket(
self.ctx, output_address, zmq.PULL
)
else:
# Engines are managed by this client.
addresses = get_engine_zmq_addresses(vllm_config)
self.input_socket = self.resources.input_socket = make_zmq_socket(
self.ctx,
addresses.inputs[0],
zmq.ROUTER,
bind=True,
router_handover=enable_input_socket_handover,
)
self.resources.output_socket = make_zmq_socket(
self.ctx, addresses.outputs[0], zmq.PULL
)
with launch_core_engines(
vllm_config, executor_class, log_stats, addresses
) as (engine_manager, coordinator, addresses, tensor_queue):
self.resources.coordinator = coordinator
self.resources.engine_manager = engine_manager
...
# Wait for ready messages from each engine on the input socket.
identities = set(self.core_engines)
sync_input_socket = zmq.Socket.shadow(self.input_socket)
while identities:
if not sync_input_socket.poll(
timeout=VLLM_ENGINE_READY_TIMEOUT_S * 1000
):
raise TimeoutError(...)
identity, _ = sync_input_socket.recv_multipart()
identities.remove(identity)
self.core_engine: EngineIdentity = self.core_engines[0]
self.utility_results: dict[int, AnyFuture] = {}
...
self.start_engine_core_monitor()
get_engine_zmq_addresses() 负责分配 input/output 地址。本地 colocated 场景使用 IPC 地址,跨节点场景使用 TCP 地址。
def get_engine_client_zmq_addr(local_only: bool, host: str, port: int = 0) -> str:
"""Assign a new ZMQ socket address.
If local_only is True, participants are colocated and so a unique IPC
address will be returned.
Otherwise, the provided host and port will be used to construct a TCP
address (port == 0 means assign an available port)."""
return (
get_open_zmq_ipc_path()
if local_only
else (get_tcp_uri(host, port or get_open_port()))
)
具体 socket 的 bind/connect 语义由 make_zmq_socket() 统一处理。默认情况下,ROUTER/PULL 会 bind,PUSH 会 connect。
def make_zmq_socket(
ctx: zmq.asyncio.Context | zmq.Context,
path: str,
socket_type: Any,
bind: bool | None = None,
identity: bytes | None = None,
linger: int | None = None,
router_handover: bool = False,
) -> zmq.Socket | zmq.asyncio.Socket:
"""Make a ZMQ socket with the proper bind/connect semantics."""
...
if bind is None:
bind = socket_type not in (zmq.PUSH, zmq.SUB, zmq.XSUB)
...
if identity is not None:
socket.setsockopt(zmq.IDENTITY, identity)
...
if bind:
socket.bind(path)
else:
socket.connect(path)
return socket
1.6 launch_core_engines():启动 Core 并完成 startup handshake
launch_core_engines() 会创建 CoreEngineProcManager,后者启动一个或多个 EngineCoreProc 后台进程。随后 wait_for_engine_startup() 在 handshake socket 上等待 Core 发来的 HELLO 和 READY。
@contextlib.contextmanager
def launch_core_engines(
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
addresses: EngineZmqAddresses,
num_api_servers: int = 1,
) -> Iterator[
tuple[
CoreEngineProcManager | CoreEngineActorManager | None,
DPCoordinator | None,
EngineZmqAddresses,
Queue | None,
]
]:
"""Launch engine and DP coordinator processes as needed."""
...
with zmq_socket_ctx(
local_handshake_address, zmq.ROUTER, bind=True
) as handshake_socket:
# Start local engines.
if local_engine_count:
local_engine_manager = CoreEngineProcManager(
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=log_stats,
handshake_address=handshake_address,
client_handshake_address=client_handshake_address,
local_client=True,
local_engine_count=local_engine_count,
start_index=dp_rank,
local_start_index=local_start_index or 0,
tensor_queue=tensor_queue,
)
else:
local_engine_manager = None
yield local_engine_manager, coordinator, addresses, tensor_queue
# Now wait for engines to start.
wait_for_engine_startup(
handshake_socket,
addresses,
engines_to_handshake,
parallel_config,
dp_size > 1 and vllm_config.model_config.is_moe,
vllm_config.cache_config,
local_engine_manager,
coordinator.proc if coordinator else None,
)
Core 进程启动后,会通过 startup_handshake() 向前端注册。前端返回 EngineHandshakeMetadata,其中包含 input/output socket 地址。
@staticmethod
def startup_handshake(
handshake_socket: zmq.Socket,
local_client: bool,
headless: bool,
parallel_config: ParallelConfig | None = None,
) -> EngineZmqAddresses:
# Send registration message.
handshake_socket.send(
msgspec.msgpack.encode(
{
"status": "HELLO",
"local": local_client,
"headless": headless,
}
)
)
# Receive initialization message.
logger.debug("Waiting for init message from front-end.")
if not handshake_socket.poll(timeout=HANDSHAKE_TIMEOUT_MINS * 60_000):
raise RuntimeError(...)
init_bytes = handshake_socket.recv()
init_message: EngineHandshakeMetadata = msgspec.msgpack.decode(
init_bytes, type=EngineHandshakeMetadata
)
...
return init_message.addresses
等 Core 内部初始化完成,它还会发 READY。随后 Core 的 input socket 线程会连接 Client 的 input ROUTER,并发送一个空帧,让 ROUTER 知道这个 engine identity 已经可路由。
def process_input_sockets(
self,
input_addresses: list[str],
coord_input_address: str | None,
identity: bytes,
ready_event: threading.Event,
):
"""Input socket IO thread."""
...
with ExitStack() as stack, zmq.Context() as ctx:
input_sockets = [
stack.enter_context(
make_zmq_socket(
ctx, input_address, zmq.DEALER, identity=identity, bind=False
)
)
for input_address in input_addresses
]
...
for input_socket in input_sockets:
# Send initial message to each input socket - this is required
# before the front-end ROUTER socket can send input messages
# back to us.
input_socket.send(b"")
poller.register(input_socket, zmq.POLLIN)
...
1.7 SyncMPClient:后台输出线程
SyncMPClient.__init__() 在 MPClient 完成 socket、Core 启动、ready 等初始化后,额外启动一个输出线程。这个线程负责:
- poll Core output PULL socket。
- 解码
EngineCoreOutputs。 - 如果是 utility output,就完成对应 Future。
- 否则放入同步
queue.Queue,供get_output()阻塞读取。
def __init__(
self, vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool
):
super().__init__(
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=log_stats,
)
self.is_dp = self.vllm_config.parallel_config.data_parallel_size > 1
self.outputs_queue = queue.Queue[EngineCoreOutputs | Exception]()
...
def process_outputs_socket():
assert isinstance(out_socket, zmq.Socket)
shutdown_socket = ctx.socket(zmq.PAIR)
try:
shutdown_socket.bind(shutdown_path)
poller = zmq.Poller()
poller.register(shutdown_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
socks = poller.poll()
if not socks:
continue
if len(socks) == 2 or socks[0][0] == shutdown_socket:
# shutdown signal, exit thread.
break
frames = out_socket.recv_multipart(copy=False)
resources.validate_alive(frames)
outputs: EngineCoreOutputs = decoder.decode(frames)
if outputs.utility_output:
_process_utility_output(outputs.utility_output, utility_results)
else:
outputs_queue.put_nowait(outputs)
except Exception as e:
outputs_queue.put_nowait(e)
finally:
# Close sockets.
shutdown_socket.close(linger=0)
out_socket.close(linger=0)
# Process outputs from engine in separate thread.
self.output_queue_thread = Thread(
target=process_outputs_socket,
name="EngineCoreOutputQueueThread",
daemon=True,
)
self.output_queue_thread.start()
...
get_output() 本身就很薄:从 outputs_queue 取一批 Core 输出。
def get_output(self) -> EngineCoreOutputs:
# If an exception arises in process_outputs_socket task,
# it is forwarded to the outputs_queue so we can raise it
# from this (run_output_handler) task to shut down the server.
outputs = self.outputs_queue.get()
if isinstance(outputs, Exception):
raise self._format_exception(outputs) from None
if outputs.wave_complete is not None:
self.engines_running = False
return outputs
二、请求如何从 LLM 添加到 Core
2.1 LLM.generate():同步离线推理的外层入口
LLM.generate() 做 runner 类型检查、填默认 SamplingParams,然后进入 _run_completion()。
def generate(
self,
prompts: PromptType | Sequence[PromptType],
sampling_params: SamplingParams | Sequence[SamplingParams] | None = None,
*,
use_tqdm: bool | Callable[..., tqdm] = True,
lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
priority: list[int] | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
) -> list[RequestOutput]:
"""Generates the completions for the input prompts."""
runner_type = self.model_config.runner_type
if runner_type != "generate":
raise ValueError(...)
if sampling_params is None:
sampling_params = self.get_default_sampling_params()
return self._run_completion(
prompts=prompts,
params=sampling_params,
output_type=RequestOutput,
use_tqdm=use_tqdm,
lora_request=lora_request,
tokenization_kwargs=tokenization_kwargs,
priority=priority,
)
_run_completion() 分两步:
_add_completion_requests():渲染 prompt 并 add request。_run_engine():同步 step,直到全部完成。
def _run_completion(
self,
prompts: PromptType | Sequence[PromptType],
params: SamplingParams
| PoolingParams
| Sequence[SamplingParams | PoolingParams],
output_type: type[_O],
*,
use_tqdm: bool | Callable[..., tqdm] = True,
lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
priority: list[int] | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
):
self._add_completion_requests(
prompts=prompts,
params=params,
use_tqdm=use_tqdm,
lora_request=lora_request,
priority=priority,
tokenization_kwargs=tokenization_kwargs,
)
return self._run_engine(use_tqdm=use_tqdm, output_type=output_type)
2.2 LLM 先把 prompt 渲染成 EngineInput
_add_completion_requests() 把单条或多条 prompt/params/lora/priority 都规整成序列,然后用 generator 一条条调用 _preprocess_cmpl_one()。这一步会经过 renderer,把用户输入变成 EngineInput。
def _add_completion_requests(
self,
prompts: PromptType | Sequence[PromptType],
params: SamplingParams
| PoolingParams
| Sequence[SamplingParams | PoolingParams],
*,
use_tqdm: bool | Callable[..., tqdm] = True,
lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
priority: list[int] | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
) -> list[str]:
seq_prompts = prompt_to_seq(prompts)
seq_params = self._params_to_seq(params, len(seq_prompts))
seq_lora_requests = self._lora_request_to_seq(lora_request, len(seq_prompts))
seq_priority = self._priority_to_seq(priority, len(prompts))
return self._render_and_add_requests(
prompts=(
self._preprocess_cmpl_one(prompt, tokenization_kwargs)
for prompt in maybe_tqdm(
seq_prompts,
use_tqdm=use_tqdm,
desc="Rendering prompts",
)
),
params=seq_params,
lora_requests=seq_lora_requests,
priorities=seq_priority,
)
实际渲染入口如下:
def _preprocess_cmpl(
self,
prompts: Sequence[PromptType],
tokenization_kwargs: dict[str, Any] | None = None,
) -> Sequence[EngineInput]:
"""
Convert prompt inputs from LLM APIs (other than [LLM.chat][]) into
a format that can be passed to `_add_request`.
"""
renderer = self.renderer
model_config = self.model_config
parsed_prompts = [
parse_model_prompt(model_config, prompt) for prompt in prompts
]
tok_params = renderer.default_cmpl_tok_params.with_kwargs(
**(tokenization_kwargs or {})
)
return renderer.render_cmpl(parsed_prompts, tok_params)
2.3 LLM._add_request():设置 FINAL_ONLY,分配外部 ID
离线 LLM.generate() 只需要最终结果,所以在 _add_request() 里会把 SamplingParams.output_kind 改为 FINAL_ONLY。随后用 Counter() 生成用户可见的外部 request id,例如 "0"、"1"。
def _add_request(
self,
prompt: EngineInput,
params: SamplingParams | PoolingParams,
lora_request: LoRARequest | None = None,
priority: int = 0,
) -> str:
if isinstance(params, SamplingParams):
# We only care about the final output
params.output_kind = RequestOutputKind.FINAL_ONLY
request_id = str(next(self.request_counter))
return self.llm_engine.add_request(
request_id,
prompt,
params,
lora_request=lora_request,
priority=priority,
)
这一层的 request_id 是外部 ID。后面 InputProcessor.assign_request_id() 会把它保存到 external_req_id,并生成内部唯一 ID。
2.4 LLMEngine.add_request():Client 侧添加请求的中枢
LLMEngine.add_request() 是请求进入 frontend engine 的关键函数。它串起三件事:
- 通过
InputProcessor.process_inputs()把输入变成EngineCoreRequest。 - 通过
OutputProcessor.add_request()先在 Client 侧建好RequestState。 - 通过
engine_core.add_request()把请求发送给 Core。
def add_request(
self,
request_id: str,
prompt: EngineCoreRequest | PromptType | EngineInput,
params: SamplingParams | PoolingParams,
arrival_time: float | None = None,
lora_request: LoRARequest | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
prompt_text: str | None = None,
) -> str:
# Validate the request_id type.
if not isinstance(request_id, str):
raise TypeError(f"request_id must be a string, got {type(request_id)}")
# Process raw inputs into the request.
if isinstance(prompt, EngineCoreRequest):
...
else:
request = self.input_processor.process_inputs(
request_id,
prompt,
params,
supported_tasks=self.get_supported_tasks(),
arrival_time=arrival_time,
lora_request=lora_request,
tokenization_kwargs=tokenization_kwargs,
trace_headers=trace_headers,
priority=priority,
)
prompt_text, _, _ = extract_prompt_components(self.model_config, prompt)
self.input_processor.assign_request_id(request)
req_id = request.request_id
# Use cloned params that may have been updated in process_inputs()
params = request.params
n = params.n if isinstance(params, SamplingParams) else 1
if n == 1:
# Make a new RequestState and queue.
self.output_processor.add_request(request, prompt_text, None, 0)
# Add the request to EngineCore.
self.engine_core.add_request(request)
return req_id
...
当 SamplingParams.n > 1 时,LLMEngine 会把一个外部请求拆成多个 child request,每个 child request 都添加到 OutputProcessor 和 Core;最终由 ParentRequest 聚合回同一个外部输出。
def add_request(
self,
request_id: str,
prompt: EngineCoreRequest | PromptType | EngineInput,
params: SamplingParams | PoolingParams,
...
) -> str:
...
# Fan out child requests (for n>1).
parent_req = ParentRequest(request)
for idx in range(n):
request_id, child_params = parent_req.get_child_info(idx)
child_request = request if idx == n - 1 else copy(request)
child_request.request_id = request_id
child_request.sampling_params = child_params
# Make a new RequestState and queue.
self.output_processor.add_request(
child_request, prompt_text, parent_req, idx
)
# Add the request to EngineCore.
self.engine_core.add_request(child_request)
return req_id
2.5 InputProcessor.process_inputs():把前端输入变成 Core 请求
InputProcessor 的产物是 EngineCoreRequest。它不负责发送,只负责把输入规整到 Core 能理解的结构。
主要处理包括:
- 校验
SamplingParams/PoolingParams和 LoRA。 - 如果已经是
EngineInput,直接使用;否则走旧的 raw prompt preprocess。 - 平台级请求校验。
- 拆分 encoder/decoder 输入。
- 处理 token ids 或 prompt embeds。
- 克隆并补全 sampling/pooling params。
- 对多模态输入构造
MultiModalFeatureSpec。 - 返回
EngineCoreRequest。
def process_inputs(
self,
request_id: str,
prompt: PromptType | EngineInput,
params: SamplingParams | PoolingParams,
supported_tasks: tuple[SupportedTask, ...],
arrival_time: float | None = None,
lora_request: LoRARequest | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
data_parallel_rank: int | None = None,
resumable: bool = False,
) -> EngineCoreRequest:
self._validate_params(params, supported_tasks)
self._validate_lora(lora_request)
...
if isinstance(prompt, dict) and "type" in prompt:
...
if arrival_time is None:
arrival_time = prompt.get("arrival_time", time.time())
processed_inputs: EngineInput = prompt
else:
...
if arrival_time is None:
arrival_time = time.time()
processed_inputs = self.input_preprocessor.preprocess(
prompt,
tokenization_kwargs=tokenization_kwargs,
)
current_platform.validate_request(processed_inputs, params)
encoder_inputs, decoder_inputs = split_enc_dec_input(processed_inputs)
self._validate_model_inputs(encoder_inputs, decoder_inputs)
# Mypy can be conservative for TypedDict unions; normalize access.
if decoder_inputs["type"] == "embeds":
prompt_token_ids = None
prompt_embeds = decoder_inputs["prompt_embeds"]
else:
prompt_token_ids = decoder_inputs["prompt_token_ids"]
prompt_embeds = None
...
采样参数会被 clone,并根据模型、generation config、tokenizer 进行补全。
def process_inputs(
self,
request_id: str,
prompt: PromptType | EngineInput,
params: SamplingParams | PoolingParams,
supported_tasks: tuple[SupportedTask, ...],
...
) -> EngineCoreRequest:
...
sampling_params = None
pooling_params = None
if isinstance(params, SamplingParams):
# TODO: can we avoid cloning here in multiproc case?
sampling_params = params.clone()
# If unset max tokens, then generate up to the max_model_len.
if sampling_params.max_tokens is None:
seq_len = length_from_prompt_token_ids_or_embeds(
prompt_token_ids, prompt_embeds
)
sampling_params.max_tokens = self.model_config.max_model_len - seq_len
sampling_params.update_from_generation_config(
self.generation_config_fields,
self.renderer.get_eos_token_id(),
)
if self.tokenizer is not None:
sampling_params.update_from_tokenizer(self.tokenizer)
else:
pooling_params = params.clone()
...
return EngineCoreRequest(
request_id=request_id,
prompt_token_ids=prompt_token_ids,
prompt_embeds=prompt_embeds,
mm_features=mm_features,
sampling_params=sampling_params,
pooling_params=pooling_params,
arrival_time=arrival_time,
lora_request=lora_request,
cache_salt=decoder_inputs.get("cache_salt"),
priority=priority,
data_parallel_rank=data_parallel_rank,
trace_headers=trace_headers,
resumable=resumable,
)
EngineCoreRequest 本身是 msgspec 结构,后续会直接被 SyncMPClient 序列化发送。
class EngineCoreRequest(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False,
):
request_id: str
prompt_token_ids: list[int] | None
mm_features: list[MultiModalFeatureSpec] | None
sampling_params: SamplingParams | None
pooling_params: PoolingParams | None
arrival_time: float
lora_request: LoRARequest | None
cache_salt: str | None
data_parallel_rank: int | None
prompt_embeds: torch.Tensor | None = None
...
external_req_id: str | None = None
reasoning_ended: bool | None = None
2.6 assign_request_id():外部 ID 与内部 ID 分离
Client 侧维护两种 ID:
external_req_id:用户/LLM 层看到的 ID,例如"0"。request_id:内部唯一 ID,例如"0-8f3a1c2d",用于 Core、OutputProcessor、abort 等内部映射。
@staticmethod
def assign_request_id(request: EngineCoreRequest):
"""Replace the externally supplied request ID with an internal request ID
that adds 8 random characters in order to ensure uniqueness.
"""
if request.external_req_id is not None:
raise ValueError(
"The external_req_id field should not be set on EngineCoreRequests"
" passed to vLLM; use the request_id field."
)
request.external_req_id = request.request_id
if envs.VLLM_DISABLE_REQUEST_ID_RANDOMIZATION:
logger.warning_once(...)
else:
request.request_id = f"{request.external_req_id}-{random_uuid():.8}"
这个设计的直接结果是:
OutputProcessor.request_states用内部request_id做 key。- 最终
RequestOutput.request_id使用external_req_id。 - 外部重复 ID 不会直接污染内部 Core 请求 ID。
2.7 OutputProcessor.add_request():先在 Client 侧登记请求状态
请求发送给 Core 之前,LLMEngine 会先让 OutputProcessor 创建 RequestState。这是因为后续 Core 只会返回 token ids、finish reason、logprobs 等增量信息;Client 必须提前保存 prompt、detokenizer、logprobs processor、输出模式、父请求关系等上下文。
def add_request(
self,
request: EngineCoreRequest,
prompt: str | None,
parent_req: ParentRequest | None = None,
request_index: int = 0,
queue: RequestOutputCollector | None = None,
) -> None:
request_id = request.request_id
req_state = self.request_states.get(request_id)
if req_state is not None:
self._update_streaming_request_state(req_state, request, prompt)
return
req_state = RequestState.from_new_request(
tokenizer=self.tokenizer,
request=request,
prompt=prompt,
parent_req=parent_req,
request_index=request_index,
queue=queue,
log_stats=self.log_stats,
stream_interval=self.stream_interval,
)
self.request_states[request_id] = req_state
if parent_req:
self.parent_requests[parent_req.request_id] = parent_req
# Track the external_req_id -> [internal_req_id, ...] mapping
self.external_req_ids[req_state.external_req_id].append(request_id)
RequestState.from_new_request() 根据请求类型创建 detokenizer/logprobs processor。对于普通 generation,它会从 SamplingParams 得到 output_kind、max_tokens、top_p、n 等信息。
@classmethod
def from_new_request(
cls,
tokenizer: TokenizerLike | None,
request: EngineCoreRequest,
prompt: str | None,
parent_req: ParentRequest | None,
request_index: int,
queue: RequestOutputCollector | None,
log_stats: bool,
stream_interval: int,
) -> "RequestState":
if sampling_params := request.sampling_params:
if not sampling_params.detokenize:
tokenizer = None
output_kind = sampling_params.output_kind
logprobs_processor = LogprobsProcessor.from_new_request(
tokenizer=tokenizer,
request=request,
)
detokenizer = IncrementalDetokenizer.from_new_request(
tokenizer=tokenizer,
request=request,
)
max_tokens_param = sampling_params.max_tokens
top_p = sampling_params.top_p
n = sampling_params.n
temperature = sampling_params.temperature
else:
logprobs_processor = None
detokenizer = None
max_tokens_param = None
top_p = None
n = None
temperature = None
assert request.pooling_params is not None
output_kind = request.pooling_params.output_kind
assert request.external_req_id is not None
return cls(
request_id=request.request_id,
external_req_id=request.external_req_id,
parent_req=parent_req,
request_index=request_index,
lora_request=request.lora_request,
output_kind=output_kind,
prompt=prompt,
prompt_token_ids=request.prompt_token_ids,
prompt_embeds=request.prompt_embeds,
logprobs_processor=logprobs_processor,
detokenizer=detokenizer,
max_tokens_param=max_tokens_param,
...
)
2.8 SyncMPClient.add_request():通过 input ROUTER 发送 ADD
当 LLMEngine 调用 self.engine_core.add_request(request) 时,在默认多进程同步路径下进入 SyncMPClient.add_request()。
def add_request(self, request: EngineCoreRequest) -> None:
if self.is_dp:
self.engines_running = True
self._send_input(EngineCoreRequestType.ADD, request)
_send_input() 会把消息拼成 ZMQ multipart:
[core_engine_identity, request_type.value, *msgpack_frames]
其中 request_type.value 对 ADD 来说是 b"\x00"。
def _send_input(self, request_type: EngineCoreRequestType, request: Any):
self.ensure_alive()
self.free_pending_messages()
# (Identity, RequestType, SerializedRequest)
msg = (self.core_engine, request_type.value, *self.encoder.encode(request))
if len(msg) <= 3:
# No auxiliary buffers => no tensor backing buffers in request.
self.input_socket.send_multipart(msg, copy=False)
return
tracker = self.input_socket.send_multipart(msg, copy=False, track=True)
self.add_pending_message(tracker, request)
Core 侧的 input socket 线程只作为传输入口:它从 DEALER socket 读出 request type 和数据帧,反序列化 EngineCoreRequest,做 Core 入口预处理,然后放入 Core 的 input_queue。到这里,请求已经进入 Core 黑盒。
def process_input_sockets(
self,
input_addresses: list[str],
coord_input_address: str | None,
identity: bytes,
ready_event: threading.Event,
):
"""Input socket IO thread."""
...
while True:
for input_socket, _ in poller.poll():
# (RequestType, RequestData)
type_frame, *data_frames = input_socket.recv_multipart(copy=False)
...
request_type = EngineCoreRequestType(bytes(type_frame.buffer))
# Deserialize the request data.
request: Any
if request_type == EngineCoreRequestType.ADD:
req: EngineCoreRequest = add_request_decoder.decode(data_frames)
try:
request = self.preprocess_add_request(req)
except Exception:
self._handle_request_preproc_error(req)
continue
else:
request = generic_decoder.decode(data_frames)
...
# Push to input queue for core busy loop.
self.input_queue.put_nowait((request_type, request))
三、如何从 Core 获取输出并返回最终推理结果
3.1 Core 返回给 Client 的输出契约
从 Client 角度看,Core 返回的是 EngineCoreOutputs,其中最重要的是 outputs: list[EngineCoreOutput]。每个 EngineCoreOutput 对应某个内部 request_id 的一次增量输出。
class EngineCoreOutput(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False,
):
request_id: str
new_token_ids: list[int]
new_logprobs: LogprobsLists | None = None
new_prompt_logprobs_tensors: LogprobsTensors | None = None
pooling_output: torch.Tensor | None = None
finish_reason: FinishReason | None = None
stop_reason: int | str | None = None
events: list[EngineCoreEvent] | None = None
kv_transfer_params: dict[str, Any] | None = None
...
@property
def finished(self) -> bool:
return self.finish_reason is not None
EngineCoreOutputs 是一批输出,还可能携带 scheduler stats、utility result、DP wave 信号等。
class EngineCoreOutputs(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False,
):
engine_index: int = 0
# [num_reqs]
outputs: list[EngineCoreOutput] = []
scheduler_stats: SchedulerStats | None = None
timestamp: float = 0.0
utility_output: UtilityOutput | None = None
finished_requests: set[str] | None = None
...
3.2 Core wrapper 如何把输出推回 Client
Core 内部 step 细节不展开。只看边界:EngineCoreProc._process_engine_step() 会拿到一批 EngineCoreOutputs,放入 output_queue。
def _process_engine_step(self) -> bool:
outputs, model_executed = self.step_fn()
# Put EngineCoreOutputs into the output queue.
for output in outputs.items() if outputs else ():
self.output_queue.put_nowait(output)
# Post-step hook.
self.post_step(model_executed)
...
return model_executed
Core 的 output socket 线程再从 output_queue 取出输出,经 msgpack 编码后通过 PUSH socket 发给 Client 的 PULL socket。
def process_output_sockets(
self, output_paths: list[str], coord_output_path: str | None, engine_index: int
):
"""Output socket IO thread."""
# Msgpack serialization encoding.
encoder = MsgpackEncoder()
...
with ExitStack() as stack, zmq.Context() as ctx:
sockets = [
stack.enter_context(
make_zmq_socket(ctx, output_path, zmq.PUSH, linger=4000)
)
for output_path in output_paths
]
...
while True:
output = self.output_queue.get()
if output == EngineCoreProc.ENGINE_CORE_DEAD:
for socket in sockets:
socket.send(output)
break
assert not isinstance(output, bytes)
client_index, outputs = output
outputs.engine_index = engine_index
...
buffers = encoder.encode_into(outputs, buffer)
tracker = sockets[client_index].send_multipart(
buffers, copy=False, track=True
)
...
3.3 SyncMPClient 输出线程:把 ZMQ 帧变成本地 Queue
前面构造部分已经看到,SyncMPClient 有一个 EngineCoreOutputQueueThread。正常输出会进入 self.outputs_queue;utility output 则用于唤醒 call_utility() 等同步调用。
同步 LLMEngine.step() 不直接读 ZMQ socket,只调用 SyncMPClient.get_output() 从本地 queue 阻塞取数据。
def get_output(self) -> EngineCoreOutputs:
# If an exception arises in process_outputs_socket task,
# it is forwarded to the outputs_queue so we can raise it
# from this (run_output_handler) task to shut down the server.
outputs = self.outputs_queue.get()
if isinstance(outputs, Exception):
raise self._format_exception(outputs) from None
if outputs.wave_complete is not None:
self.engines_running = False
return outputs
3.4 LLMEngine.step():一次同步 step 的外壳
LLMEngine.step() 完成一次“从 Core 取一批输出并处理”的动作:
self.engine_core.get_output()获取EngineCoreOutputs。self.output_processor.process_outputs()生成RequestOutput。- 如果 detokenizer 在 Client 侧发现 stop string,但 Core 尚未停止该请求,则补发 abort。
- 记录统计信息。
def step(self) -> list[RequestOutput | PoolingRequestOutput]:
if self.should_execute_dummy_batch:
self.should_execute_dummy_batch = False
self.engine_core.execute_dummy_batch()
return []
# 1) Get EngineCoreOutput from the EngineCore.
with record_function_or_nullcontext("llm_engine step: get_output"):
outputs = self.engine_core.get_output()
# 2) Process EngineCoreOutputs.
with record_function_or_nullcontext("llm_engine step: process_outputs"):
iteration_stats = IterationStats() if self.log_stats else None
processed_outputs = self.output_processor.process_outputs(
outputs.outputs,
engine_core_timestamp=outputs.timestamp,
iteration_stats=iteration_stats,
)
self.output_processor.update_scheduler_stats(outputs.scheduler_stats)
# 3) Abort any reqs that finished due to stop strings.
with record_function_or_nullcontext("llm_engine step: abort_requests"):
self.engine_core.abort_requests(processed_outputs.reqs_to_abort)
# 4) Record stats
...
return processed_outputs.request_outputs
这里的 outputs.outputs 是真正给 OutputProcessor 处理的 list[EngineCoreOutput]。
3.5 OutputProcessor.process_outputs():唯一批量遍历 Core outputs 的地方
OutputProcessor.process_outputs() 是 Client 侧输出处理的核心。它以内部 request_id 找到对应 RequestState,再逐个输出更新 detokenizer、logprobs、stats,最后创建外部 RequestOutput。
def process_outputs(
self,
engine_core_outputs: list[EngineCoreOutput],
engine_core_timestamp: float | None = None,
iteration_stats: IterationStats | None = None,
) -> OutputProcessorOutput:
"""
Process the EngineCoreOutputs:
1) Compute stats for logging
2) Detokenize
3) Create and handle RequestOutput objects
"""
request_outputs: list[RequestOutput | PoolingRequestOutput] = []
reqs_to_abort: list[str] = []
for engine_core_output in engine_core_outputs:
req_id = engine_core_output.request_id
req_state = self.request_states.get(req_id)
if req_state is None:
# Ignore output for already-aborted request.
continue
# 1) Compute stats for this iteration.
self._update_stats_from_output(
req_state, engine_core_output, engine_core_timestamp, iteration_stats
)
new_token_ids = engine_core_output.new_token_ids
pooling_output = engine_core_output.pooling_output
finish_reason = engine_core_output.finish_reason
stop_reason = engine_core_output.stop_reason
kv_transfer_params = engine_core_output.kv_transfer_params
routed_experts = engine_core_output.routed_experts
req_state.num_cached_tokens = engine_core_output.num_cached_tokens
req_state.is_prefilling = False
...
对于 generation 输出,先用 IncrementalDetokenizer 处理新增 token。若 Client 侧识别到 stop string,会把 finish_reason 改成 STOP,并在后面通知 LLMEngine abort Core 侧请求。
def process_outputs(
self,
engine_core_outputs: list[EngineCoreOutput],
engine_core_timestamp: float | None = None,
iteration_stats: IterationStats | None = None,
) -> OutputProcessorOutput:
...
for engine_core_output in engine_core_outputs:
...
if pooling_output is None:
assert req_state.detokenizer is not None
assert req_state.logprobs_processor is not None
# 2) Detokenize the token ids into text and perform stop checks.
stop_string = req_state.detokenizer.update(
new_token_ids, finish_reason == FinishReason.STOP
)
if stop_string:
finish_reason = FinishReason.STOP
stop_reason = stop_string
# 3) Compute sample and prompt logprobs for request,
# if required.
req_state.logprobs_processor.update_from_output(engine_core_output)
# 4) Create and handle RequestOutput objects.
if request_output := req_state.make_request_output(
new_token_ids,
pooling_output,
finish_reason,
stop_reason,
kv_transfer_params,
routed_experts,
):
if req_state.streaming_input:
request_output.finished = False
if req_state.queue is not None:
# AsyncLLM: put into queue for handling by generate().
req_state.queue.put(request_output)
else:
# LLMEngine: return list of RequestOutputs.
request_outputs.append(request_output)
...
请求完成时,OutputProcessor 会清理内部状态;如果是 stop string 导致 Client 侧完成而 Core 输出本身未标 finished,则把内部 ID 放入 reqs_to_abort。
def process_outputs(
self,
engine_core_outputs: list[EngineCoreOutput],
engine_core_timestamp: float | None = None,
iteration_stats: IterationStats | None = None,
) -> OutputProcessorOutput:
...
for engine_core_output in engine_core_outputs:
...
# Free completed requests.
if finish_reason is not None:
if req_state.streaming_input:
...
else:
self._finish_request(req_state)
if not engine_core_output.finished:
# If req not finished in EngineCore, but Detokenizer
# detected stop string, abort needed in EngineCore.
reqs_to_abort.append(req_id)
# Track per-request stats
self._update_stats_from_finished(
req_state, finish_reason, iteration_stats
)
if self.tracing_enabled:
self.do_tracing(engine_core_output, req_state, iteration_stats)
return OutputProcessorOutput(
request_outputs=request_outputs,
reqs_to_abort=reqs_to_abort,
)
清理动作会移除内部 request_id,并维护 external_req_id -> internal_req_id 的映射。
def _finish_request(self, req_state: RequestState) -> None:
req_id = req_state.request_id
self.request_states.pop(req_id)
internal_ids = self.external_req_ids[req_state.external_req_id]
internal_ids.remove(req_id)
if not internal_ids:
del self.external_req_ids[req_state.external_req_id]
# Remove parent request if applicable.
parent_req = req_state.parent_req
if parent_req and not parent_req.child_requests:
self.parent_requests.pop(parent_req.request_id, None)
3.6 RequestState.make_request_output():内部状态转外部输出
RequestState 保存了生成 RequestOutput 所需的所有 Client 侧上下文:
- 外部/内部 request id。
- prompt 文本和 prompt token ids。
- detokenizer 累积文本。
- logprobs processor。
output_kind。- parallel sampling 的 parent request。
- stats、LoRA、stream interval 等。
它的 make_request_output() 会先根据 output_kind 判断是否该返回输出。离线 LLM.generate() 已经把 output_kind 设置为 FINAL_ONLY,因此非 finished 的中间 token 通常不会形成 RequestOutput。
def make_request_output(
self,
new_token_ids: list[int],
pooling_output: torch.Tensor | None,
finish_reason: FinishReason | None,
stop_reason: int | str | None,
kv_transfer_params: dict[str, Any] | None = None,
routed_experts: np.ndarray | None = None,
) -> RequestOutput | PoolingRequestOutput | None:
finished = finish_reason is not None
final_only = self.output_kind == RequestOutputKind.FINAL_ONLY
if not finished and final_only:
# Only the final output is required in FINAL_ONLY mode.
return None
...
generation 输出会走 _new_completion_output(),再组装为 RequestOutput。注意这里返回给用户的是 external_req_id。
def make_request_output(
self,
new_token_ids: list[int],
pooling_output: torch.Tensor | None,
finish_reason: FinishReason | None,
stop_reason: int | str | None,
kv_transfer_params: dict[str, Any] | None = None,
routed_experts: np.ndarray | None = None,
) -> RequestOutput | PoolingRequestOutput | None:
...
external_req_id = self.external_req_id
if pooling_output is not None:
return self._new_request_output(
external_req_id,
[self._new_pooling_output(pooling_output)],
finished,
)
output = self._new_completion_output(
new_token_ids, finish_reason, stop_reason, routed_experts
)
if self.parent_req is None:
outputs = [output]
else:
outputs, finished = self.parent_req.get_outputs(self.request_id, output)
if not outputs:
return None
external_req_id = self.parent_req.external_req_id
return self._new_request_output(
external_req_id, outputs, finished, kv_transfer_params
)
_new_request_output() 最终创建外部 API 看到的 RequestOutput。
def _new_request_output(
self,
external_req_id: str,
outputs: list[CompletionOutput] | list[PoolingOutput],
finished: bool,
kv_transfer_params: dict[str, Any] | None = None,
) -> RequestOutput | PoolingRequestOutput:
...
return RequestOutput(
request_id=external_req_id, # request_id is what was provided externally
lora_request=self.lora_request,
prompt=self.prompt,
prompt_token_ids=prompt_token_ids,
prompt_logprobs=prompt_logprobs,
outputs=cast(list[CompletionOutput], outputs),
finished=finished,
kv_transfer_params=kv_transfer_params,
num_cached_tokens=self.num_cached_tokens,
metrics=self.stats,
)
而每条 completion 的文本、token ids、finish reason 来自 detokenizer 和当前 Core 输出。
def _new_completion_output(
self,
token_ids: list[int],
finish_reason: FinishReason | None,
stop_reason: int | str | None,
routed_experts: np.ndarray | None = None,
) -> CompletionOutput:
assert self.detokenizer is not None
assert self.logprobs_processor is not None
finished = finish_reason is not None
delta = self.output_kind == RequestOutputKind.DELTA
# Prepare text and token_ids, based on delta mode
text = self.detokenizer.get_next_output_text(finished, delta)
if not delta:
token_ids = self.detokenizer.output_token_ids
# Prepare logprobs, based on delta mode
logprobs = self.logprobs_processor.logprobs
if delta and logprobs:
logprobs = logprobs[-len(token_ids) :]
return CompletionOutput(
index=self.request_index,
text=text,
token_ids=token_ids,
routed_experts=routed_experts,
logprobs=logprobs,
cumulative_logprob=self.logprobs_processor.cumulative_logprob,
finish_reason=str(finish_reason) if finished else None,
stop_reason=stop_reason if finished else None,
)
3.7 LLM._run_engine():同步拉取直到所有请求完成
LLM._run_engine() 是离线同步 API 的最后一环。它一直调用 LLMEngine.step(),直到 LLMEngine.has_unfinished_requests() 为 false。
def _run_engine(
self,
output_type: type[_O] | tuple[type[_O], ...],
*,
use_tqdm: bool | Callable[..., tqdm] = True,
) -> list[_O]:
# Initialize tqdm.
if use_tqdm:
num_requests = self.llm_engine.get_num_unfinished_requests()
...
# Run the engine.
outputs: list[_O] = []
total_in_toks = 0
total_out_toks = 0
while self.llm_engine.has_unfinished_requests():
step_outputs = self.llm_engine.step()
for output in step_outputs:
assert isinstance(output, output_type)
if output.finished:
outputs.append(output)
if use_tqdm:
...
if use_tqdm:
pbar.close()
# Sort the outputs by request ID.
# This is necessary because some requests may be finished earlier than
# its previous requests.
return sorted(outputs, key=lambda x: int(x.request_id))
由于 LLM.generate() 使用 FINAL_ONLY,这里收集到的通常都是 final RequestOutput。如果多个请求并发执行,完成顺序可能和提交顺序不同,所以最后按外部 request id 排序。
四、一个小例子串起完整流程
示例代码:
from vllm import LLM, SamplingParams
llm = LLM(model="facebook/opt-125m")
params = SamplingParams(max_tokens=3)
outputs = llm.generate(["Hello"], params, use_tqdm=False)
print(outputs[0].request_id)
print(outputs[0].outputs[0].text)
这段代码在 Client 侧大致会经历下面的状态变化。
4.1 构造阶段
LLM(...)
-> EngineArgs(...)
-> LLMEngine.from_engine_args(...)
-> VllmConfig
-> Executor class
-> LLMEngine(...)
-> renderer
-> InputProcessor
-> OutputProcessor
-> EngineCoreClient.make_client(...)
-> SyncMPClient(...)
-> bind input ROUTER
-> bind output PULL
-> launch EngineCoreProc
-> handshake HELLO / READY
-> start output queue thread
构造完成后,LLM 还会通过 get_supported_tasks() 做一次 utility RPC,确认模型支持 generation。
4.2 添加请求阶段
LLM.generate(["Hello"], SamplingParams(max_tokens=3))
-> _run_completion()
-> _add_completion_requests()
-> _preprocess_cmpl_one("Hello")
-> renderer.render_cmpl(...)
-> EngineInput(type=..., prompt_token_ids=[...])
-> _add_request(...)
external request_id = "0"
params.output_kind = FINAL_ONLY
-> LLMEngine.add_request("0", EngineInput, params)
-> InputProcessor.process_inputs(...)
EngineCoreRequest(request_id="0", prompt_token_ids=[...])
-> InputProcessor.assign_request_id(...)
external_req_id = "0"
request_id = "0-<random8>"
-> OutputProcessor.add_request(...)
request_states["0-<random8>"] = RequestState(...)
-> SyncMPClient.add_request(...)
send [identity, ADD, msgpack(EngineCoreRequest)] to Core
此时 Client 侧已经有一个 RequestState,Core 侧也已经收到一个 ADD 请求。RequestState 里保存的是后续拼 RequestOutput 必需的上下文;Core 不需要知道最终 API 输出长什么样。
4.3 输出返回阶段
假设 Core 分几轮返回 token:
Core -> EngineCoreOutputs([
EngineCoreOutput(
request_id="0-<random8>",
new_token_ids=[...],
finish_reason=None,
)
])
SyncMPClient 输出线程把它解码后放入 outputs_queue。LLM._run_engine() 调用 LLMEngine.step(),后者通过 get_output() 取到这批输出,并交给 OutputProcessor.process_outputs()。
因为该请求是 FINAL_ONLY,如果 finish_reason is None:
RequestState.make_request_output(...)
-> not finished and final_only
-> return None
中间 token 会被 detokenizer 累积,但不会返回给 LLM._run_engine()。
最后一轮 Core 返回:
Core -> EngineCoreOutputs([
EngineCoreOutput(
request_id="0-<random8>",
new_token_ids=[...],
finish_reason=STOP or LENGTH,
stop_reason=...,
)
])
这一次 RequestState.make_request_output() 会创建外部输出:
RequestOutput(
request_id="0",
prompt="Hello",
prompt_token_ids=[...],
outputs=[
CompletionOutput(
index=0,
text="<detokenized final text>",
token_ids=[all generated token ids],
finish_reason="stop" or "length",
...
)
],
finished=True,
)
随后 OutputProcessor._finish_request() 删除 request_states["0-<random8>"]。当所有请求都被清理后,LLMEngine.has_unfinished_requests() 返回 false,LLM._run_engine() 停止循环,按外部 request id 排序并返回最终 RequestOutput 列表。
五、小结
整条 Client 侧链路可以浓缩为一句话:
LLM 负责用户 API 和同步驱动,
LLMEngine 负责把输入/输出在 API 形态与 Core 形态之间转换,
InputProcessor 把 EngineInput 做成 EngineCoreRequest,
OutputProcessor + RequestState 把 EngineCoreOutput 还原成 RequestOutput,
SyncMPClient 则负责用 ZMQ 把 EngineCoreRequest/EngineCoreOutputs 穿过进程边界。
在这个设计里,Client 侧和 Core 侧的边界非常清晰:
- 请求下行只发送
EngineCoreRequest和 request type。 - 输出上行只接收
EngineCoreOutputs。 - prompt 文本、detokenizer、logprobs、外部 request id、parallel sampling 聚合等 API 语义都留在 Client 侧的
OutputProcessor/RequestState中处理。 - Core 内部可以专注调度和执行;Client 侧负责把黑盒执行结果整理成稳定的用户 API 输出。
- 感谢你赐予我前进的力量
