vLLM 核心架构与调用链深度剖析

一、整体架构概览

graph TB
    subgraph Frontend["前端层 (Frontend)"]
        API[API Server]
        AsyncLLM[AsyncLLM]
    end
    
    subgraph Client["客户端层 (Client)"]
        EngineCoreClient[EngineCoreClient]
        MPClient[MPClient/AsyncMPClient]
    end
    
    subgraph EngineLayer["引擎核心层 (EngineCore)"]
        EngineCore[EngineCore]
        EngineCoreProc[EngineCoreProc]
        InputProcessor[InputProcessor]
        OutputProcessor[OutputProcessor]
    end
    
    subgraph Scheduling["调度层 (Scheduling)"]
        Scheduler[Scheduler]
        KVCacheManager[KVCacheManager]
    end
    
    subgraph Execution["执行层 (Execution)"]
        Executor[Executor]
        Worker[Worker]
        ModelRunner[ModelRunner]
    end
    
    subgraph Model["模型层 (Model)"]
        Model_Impl[Model Implementation]
        Attention[Attention Layer]
        KVCache[KV Cache]
    end
    
    API --> AsyncLLM
    AsyncLLM --> InputProcessor
    AsyncLLM --> OutputProcessor
    AsyncLLM --> EngineCoreClient
    EngineCoreClient --> MPClient
    MPClient -->|ZMQ IPC| EngineCoreProc
    EngineCoreProc --> EngineCore
    EngineCore --> Scheduler
    Scheduler --> KVCacheManager
    EngineCore --> Executor
    Executor --> Worker
    Worker --> ModelRunner
    ModelRunner --> Model_Impl
    Model_Impl --> Attention
    Attention --> KVCache

二、核心组件详解

2.1 整体调用流程

vLLM V1 的完整调用链从 CLI 命令行启动开始:

  1. API Server 创建:在 vllm/entrypoints/openai/api_server.py 中创建 API 服务器进程时,会实例化 AsyncLLM 对象
  2. 注册路由: 在openai请求格式的路由注册:vllm/entrypoints/openai/chat_completion/api_router.py中注册"/v1/chat/completions"的路由,负责处理chat请求
  3. 请求处理:当收到 OpenAI 格式的 HTTP 请求时,调用 vllm/entrypoints/openai/chat_completion/serving.py中的OpenAIServingChat::create_chat_completion 函数
  4. 异步生成:通过 AsyncLLM.generate() 方法处理请求,该方法实现了异步流式输出
  5. 多进程通信:通过 EngineCoreClient 与后台的 EngineCore 进程进行 ZMQ IPC 通信

关键设计特点:

  • 异步化设计:所有操作都是异步的,支持高并发请求处理
  • 进程隔离:前端 API 服务与后端推理引擎运行在不同进程,通过 ZMQ 解耦
  • 流式输出:支持实时流式输出,减少用户等待时间

2.2 AsyncLLM - 异步推理引擎入口

AsyncLLM 是 vLLM V1 最核心的入口类,位于 vllm/v1/engine/async_llm.py

核心职责:

  1. 请求处理:接收并预处理用户请求
  2. 输出流管理:管理异步输出流
  3. 生命周期管理:管理 EngineCore 的生命周期

关键组件初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class AsyncLLM(EngineClient):
def __init__(self, ...):
# 1. 输入处理器 - 处理 prompt 的 tokenization 和多模态数据
self.input_processor = InputProcessor(self.vllm_config, tokenizer)

# 2. 输出处理器 - 将 EngineCoreOutputs 转换为 RequestOutput
self.output_processor = OutputProcessor(
self.tokenizer,
log_stats=self.log_stats,
stream_interval=self.vllm_config.scheduler_config.stream_interval,
)

# 3. 引擎核心客户端 - 通过 ZMQ 与后台进程通信
self.engine_core = EngineCoreClient.make_async_mp_client(...)

2.3 请求异步化流程

sequenceDiagram
    participant API as API Server
    participant AsyncLLM as AsyncLLM
    participant InputProcessor as InputProcessor
    participant OutputProcessor as OutputProcessor
    participant EngineCore as EngineCore (Backend Process)
    participant Queue as RequestOutputCollector

    API->>AsyncLLM: generate(prompt, params)
    AsyncLLM->>InputProcessor: process_inputs()
    InputProcessor-->>AsyncLLM: EngineCoreRequest
    AsyncLLM->>Queue: 创建 RequestOutputCollector
    AsyncLLM->>OutputProcessor: add_request()
    AsyncLLM->>EngineCore: add_request_async() [ZMQ]
    
    loop 输出循环
        EngineCore-->>OutputProcessor: EngineCoreOutputs [ZMQ]
        OutputProcessor->>Queue: put(RequestOutput)
        AsyncLLM->>Queue: get_nowait() or await get()
        AsyncLLM-->>API: yield RequestOutput
    end

关键代码路径 generate 方法:

1
2
3
4
5
6
7
8
9
10
11
async def generate(self, prompt, sampling_params, request_id, ...):
# 1. 添加请求到处理队列
q = await self.add_request(request_id, prompt, sampling_params, ...)

# 2. 循环等待输出
finished = False
while not finished:
# 非阻塞获取,避免任务切换开销
out = q.get_nowait() or await q.get()
finished = out.finished
yield out

2.4 InputProcessor、OutputProcessor、RequestOutputCollector 关键作用介绍

这三个组件是 vLLM V1 异步流水线的核心,负责请求的输入处理、输出转换和流式输出收集。

2.4.1 InputProcessor - 输入处理器

InputProcessor 位于 vllm/v1/engine/processor.py,负责将用户输入转换为引擎可处理的格式。

核心职责:

功能 描述
Prompt 处理 将文本 prompt 进行 tokenization
多模态处理 处理图像、音频等多模态输入数据
参数验证 验证和规范化 SamplingParams
请求封装 生成 EngineCoreRequest 对象

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class InputProcessor:
def __init__(self, vllm_config, tokenizer, ...):
self.tokenizer = tokenizer
self.mm_processor = MultiModalProcessor(...) # 多模态处理器

def process_inputs(
self,
request_id: str,
prompt: PromptType,
params: Union[SamplingParams, PoolingParams],
...
) -> EngineCoreRequest:
# 1. 预处理 prompt (处理 chat template 等)
preprocessed = self.tokenizer.preprocess(prompt)

# 2. 处理多模态数据 (如有)
if mm_data := preprocessed.multi_modal_data:
mm_inputs = self.mm_processor.process(mm_data)

# 3. Tokenization
prompt_token_ids = self.tokenizer.encode(preprocessed.prompt)

# 4. 封装为 EngineCoreRequest
return EngineCoreRequest(
request_id=request_id,
prompt_token_ids=prompt_token_ids,
mm_inputs=mm_inputs,
sampling_params=params,
...
)

2.4.2 OutputProcessor - 输出处理器

OutputProcessor 位于 vllm/v1/engine/output_processor.py,负责将引擎核心的输出转换为用户友好的格式。

核心职责:

功能 描述
Detokenization 将 token IDs 转换为文本
Logprobs 计算 计算和格式化 logprobs
统计信息更新 更新请求的 metrics 统计
流式输出管理 stream_interval 控制输出频率

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class OutputProcessor:
def __init__(self, tokenizer, log_stats, stream_interval, ...):
self.tokenizer = tokenizer
self.stream_interval = stream_interval # 流式输出间隔
self.request_states: dict[str, RequestState] = {}

def add_request(self, request: EngineCoreRequest, queue: RequestOutputCollector):
"""注册新请求,关联输出队列"""
self.request_states[request.request_id] = RequestState(
request=request,
queue=queue,
detokenizer=Detokenizer(self.tokenizer),
logprobs_processor=LogprobsProcessor() if request.logprobs else None,
)

def process_outputs(self, engine_core_outputs: list[EngineCoreOutput]):
"""处理引擎输出,生成 RequestOutput"""
for output in engine_core_outputs:
req_state = self.request_states[output.request_id]

# 1. Detokenize 新生成的 tokens
new_text = req_state.detokenizer.decode(output.new_token_ids)

# 2. 处理 logprobs (如需要)
logprobs = None
if req_state.logprobs_processor:
logprobs = req_state.logprobs_processor.process(output)

# 3. 检查是否满足流式输出条件
if self._should_output(req_state):
request_output = RequestOutput(
request_id=output.request_id,
outputs=[CompletionOutput(text=new_text, logprobs=logprobs)],
finished=output.finished,
)
# 4. 放入请求的输出队列
req_state.queue.put(request_output)

2.4.3 RequestOutputCollector - 请求输出收集器

RequestOutputCollector 位于 vllm/v1/engine/output_processor.py,实现了高效的生产者-消费者模式,用于收集和传递流式输出。

核心设计:

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────────────────┐
│ RequestOutputCollector │
│ │
│ OutputProcessor ────► put() ────► output ────► get() ────► AsyncLLM.generate()
│ (Producer) │ (Buffer) │ (Consumer)
│ │ │ │
│ └── ready.set() ◄───────┘ │
│ (Event Signal) │
│ │
│ 特性: 增量合并、非阻塞获取、异步等待 │
└─────────────────────────────────────────────────────────────┘

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class RequestOutputCollector:
"""收集流式 RequestOutput,实现生产者-消费者模式"""

def __init__(self, aggregate: bool = False):
self.output: Optional[RequestOutput] = None
self.ready = asyncio.Event() # 通知消费者有新数据
self.aggregate = aggregate # 是否聚合所有输出

def put(self, output: RequestOutput):
"""生产者:放入输出 (非阻塞)"""
if self.output is None:
self.output = output
self.ready.set() # 通知消费者
elif isinstance(self.output, RequestOutput):
# 增量合并:将新输出合并到已有输出
self.output.add(output, aggregate=self.aggregate)

def get_nowait(self) -> Optional[RequestOutput]:
"""消费者:非阻塞获取"""
if self.output is None:
return None
output = self.output
self.output = None
self.ready.clear()
return output

async def get(self) -> RequestOutput:
"""消费者:阻塞等待获取"""
while self.output is None:
await self.ready.wait() # 异步等待信号
output = self.output
self.output = None
self.ready.clear()
return output

架构如下:

flowchart TD
    A[模型调用 put] --> B{self.output 是否为 None?}
    B -->|是| C[直接赋值, 设置 ready 事件]
    B -->|否| D{输出类型?}
    D -->|RequestOutput| E[调用 output.add 合并]
    D -->|PoolingRequestOutput| F[直接覆盖]
    D -->|Exception| G[直接覆盖, 设置 ready]
    
    E --> H{output_kind?}
    H -->|DELTA| I[增量合并: text/tokens 追加]
    H -->|其他| J[替换模式: 整体替换 completion]
    
    K[消费者调用 get] --> L[获取 self.output]
    L --> M[self.output = None]
    M --> N[清除 ready 事件]

设计亮点:

特性 作用
增量合并 多次 put() 的输出可合并,减少数据传递次数
非阻塞优先 get_nowait() 避免协程切换开销,提升吞吐量
Event 信号 使用 asyncio.Event 实现高效的异步等待
单缓冲设计 只保留最新聚合结果,内存占用低

三者协作流程:

sequenceDiagram
    participant User as 用户请求
    participant IP as InputProcessor
    participant EC as EngineCore
    participant OP as OutputProcessor
    participant ROC as RequestOutputCollector
    participant Gen as generate()
    
    User->>IP: prompt + params
    IP->>IP: tokenize + 多模态处理
    IP-->>EC: EngineCoreRequest
    
    Note over OP,ROC: 注册请求时关联 Collector
    IP->>OP: add_request(request, collector)
    
    loop 推理循环
        EC-->>OP: EngineCoreOutput (new tokens)
        OP->>OP: detokenize + logprobs
        OP->>ROC: put(RequestOutput)
        ROC->>ROC: ready.set()
        Gen->>ROC: get_nowait() / await get()
        ROC-->>Gen: RequestOutput
        Gen-->>User: yield output (流式)
    end

2.5 EngineCoreClient - 多进程通信

EngineCoreClient通过 ZMQ 实现前后端进程间通信,支持多种模式:

类型 场景 特点
InprocClient 同进程 直接调用,无 IPC 开销
SyncMPClient LLM 同步接口 ZMQ + 后台进程
AsyncMPClient AsyncLLM 异步接口 ZMQ + 异步事件循环
DPAsyncMPClient 数据并行 (外部 LB) 多引擎,外部负载均衡
DPLBAsyncMPClient 数据并行 (内部 LB) 多引擎,内部负载均衡

异步输出处理循环:

1
2
3
4
5
6
7
8
9
10
11
12
# AsyncMPClient 中的 output_queue_task
async def process_outputs_socket():
while True:
frames = await output_socket.recv_multipart(copy=False)
outputs: EngineCoreOutputs = decoder.decode(frames)

if outputs.utility_output:
# 处理工具方法的返回结果
_process_utility_output(outputs.utility_output, utility_results)
else:
# 将输出放入队列
outputs_queue.put_nowait(outputs)

2.5.1 AsyncMPClient 与 EngineCore 的解耦架构

AsyncMPClient 是前端进程与后端 EngineCore 进程通信的桥梁,采用 ZMQ 实现高效的进程间通信。

通信架构图

flowchart LR
    subgraph Frontend["前端进程 (AsyncLLM)"]
        Client[AsyncMPClient]
        IS[input_socket
ROUTER] OS[output_socket
PULL] OQ[outputs_queue
asyncio.Queue] OQT[output_queue_task
后台协程] end subgraph Backend["后台进程 (EngineCoreProc)"] EP[EngineCoreProc] end Client -->|add_request_async| IS IS -->|ZMQ IPC| EP EP -->|ZMQ IPC| OS OS --> OQT OQT --> OQ OQ -->|get_output_async| Client

前端请求发送

1
2
3
4
5
6
7
8
9
# AsyncMPClient.add_request_async (前端进程)
async def add_request_async(self, request: EngineCoreRequest) -> None:
request.client_index = self.client_index
await self._send_input(EngineCoreRequestType.ADD, request)
self._ensure_output_queue_task()

# 消息格式: (engine_identity, request_type, msgpack_serialized_request)
msg = (engine,) + (request_type.value, *self.encoder.encode(request))
await self.input_socket.send_multipart(msg, copy=False)

前端输出接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 后台协程接收输出
async def process_outputs_socket():
while True:
frames = await output_socket.recv_multipart(copy=False)
outputs: EngineCoreOutputs = decoder.decode(frames)

if outputs.utility_output:
_process_utility_output(outputs.utility_output, utility_results)
continue

# 放入 outputs_queue 供 generate() 消费
if outputs.outputs or outputs.scheduler_stats:
outputs_queue.put_nowait(outputs)

# 用户通过 get_output_async 获取
async def get_output_async(self) -> EngineCoreOutputs:
outputs = await self.outputs_queue.get()
return outputs

解耦的关键优势

特性 说明
异步非阻塞 前端使用 asyncio,不阻塞用户请求处理
进程隔离 GPU 推理在独立进程,避免 GIL 影响
ZMQ 高效传输 使用 msgpack 序列化 + ZMQ IPC,零拷贝传输
背压控制 通过队列实现生产者-消费者模式

后端 EngineCoreProc 的详细实现(三线程模型、主循环处理等)请参见 3.2 EngineCoreProc - 后台进程

三、EngineCore - 核心推理引擎

3.1 EngineCore 初始化流程

flowchart TB
    subgraph Init["EngineCore 初始化"]
        A[创建 Executor] --> B[初始化 KV Cache]
        B --> C[创建 Scheduler]
        C --> D[创建 StructuredOutputManager]
        D --> E[启动批处理队列]
    end

关键初始化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# vllm/v1/engine/core.py
class EngineCore:
def __init__(self, vllm_config, executor_class, ...):
# 1. 创建模型执行器
self.model_executor = executor_class(vllm_config)

# 2. 初始化 KV Cache
num_gpu_blocks, num_cpu_blocks, kv_cache_config = self._initialize_kv_caches(vllm_config)
self.collective_rpc("initialize_cache", args=(num_gpu_blocks, num_cpu_blocks))

# 3. 创建调度器
Scheduler = vllm_config.scheduler_config.get_scheduler_cls()
self.scheduler = Scheduler(vllm_config=vllm_config, kv_cache_config=kv_cache_config, ...)

# 4. 批处理队列 (用于 Pipeline 并行)
if self.batch_queue_size > 1:
self.batch_queue = deque(maxlen=self.batch_queue_size)

3.2 EngineCoreProc - 后台进程

EngineCoreProc 继承 EngineCore,在独立进程中运行,通过 ZMQ 与前端通信。它采用三线程模型来解耦 IO 与计算。

3.2.1 三线程架构

flowchart TB
    subgraph Backend["后台进程 (EngineCoreProc)"]
        IQ[input_queue
queue.Queue] OutQ[output_queue
queue.Queue] subgraph Threads["三个独立线程"] IT[input_thread
输入IO线程] OT[output_thread
输出IO线程] MT[main_thread
主循环线程] end EC[EngineCore
核心推理引擎] Scheduler[Scheduler
调度器] Model[Model Runner
模型执行] end IT -->|ZMQ recv| IQ IQ -->|取出处理| MT MT --> Scheduler Scheduler --> EC EC --> Model Model --> EC EC -->|放入| OutQ OutQ -->|取出| OT OT -->|ZMQ send| Output[输出到前端]

coreClient启动engineCoreProc后台进程流程,三线程职责分工:

flowchart TB
    subgraph CoreClient["core_client.py (前端进程)"]
        A[MPClient.__init__]
    end
    
    subgraph Utils["utils.py"]
        B[launch_core_engines]
        C[CoreEngineProcManager]
        C1[context.Process创建子进程]
    end
    
    subgraph CoreProcess["后台进程 - EngineCoreProc"]
        D[run_engine_core
静态方法作为 target_fn] subgraph Init["EngineCoreProc.__init__"] E1[_perform_handshakes
与前端握手获取 ZMQ 地址] E2[_init_data_parallel
初始化 DP 配置] E3[super.__init__
初始化 EngineCore 基类] subgraph ThreadInit["IO 线程初始化"] T1[创建 ready_event] T2[创建 input_thread
target=process_input_sockets] T3[input_thread.start] T4[创建 output_thread
target=process_output_sockets] T5[output_thread.start] T6[等待 ready_event
确保 DP Coordinator 就绪] end end F[main_thread.run_busy_loop
主循环开始运行] end subgraph Threads["后台 IO 线程"] IT[input_thread
process_input_sockets] OT[output_thread
process_output_sockets] IT1[创建 ZMQ DEALER sockets
连接到前端 input_addresses] IT2[创建 ZMQ XSUB socket
连接到 DP Coordinator] IT3[发送订阅消息到 Coordinator] IT4[等待 Coordinator READY] IT5[ready_event.set 通知主线程] IT6[循环接收消息
放入 input_queue] OT1[创建 ZMQ PUSH sockets
连接到前端 output_addresses] OT2[创建 ZMQ XPUB socket
连接到 DP Coordinator] OT3[循环从 output_queue 取出
发送到前端] end A -->|1. 调用上下文管理器| B B -->|2. 创建进程管理器| C C -->|3. 创建子进程| C1 C1 -->|4. proc.start| D D -->|5. 实例化| E1 E1 -->|6. 握手完成获取地址| E2 E2 --> E3 E3 -->|7. 创建线程| T1 T1 --> T2 T2 --> T3 T3 -->|8. 启动 input_thread| IT T3 --> T4 T4 --> T5 T5 -->|9. 启动 output_thread| OT T5 --> T6 IT --> IT1 IT1 --> IT2 IT2 --> IT3 IT3 --> IT4 IT4 --> IT5 IT5 -.->|通知| T6 IT5 --> IT6 OT --> OT1 OT1 --> OT2 OT2 --> OT3 T6 -->|10. 线程就绪后| F
线程 职责 特点
input_thread 接收 ZMQ 消息,放入 input_queue IO 密集,非阻塞
main_thread 核心忙循环,调度与推理 计算密集,GPU 操作
output_thread 从 output_queue 取出,通过 ZMQ 发送 IO 密集,非阻塞

3.2.2 请求接收流程(input_thread)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 位置: core.py
# input_thread 接收并放入 input_queue
while True:
for input_socket, _ in poller.poll():
type_frame, *data_frames = input_socket.recv_multipart(copy=False)
request_type = EngineCoreRequestType(bytes(type_frame.buffer))

if request_type == EngineCoreRequestType.ADD:
req: EngineCoreRequest = add_request_decoder.decode(data_frames)
request = self.preprocess_add_request(req)
else:
request = generic_decoder.decode(data_frames)

# 放入 input_queue 供主循环处理
self.input_queue.put_nowait((request_type, request))

3.2.3 核心忙循环(main_thread)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def run_busy_loop(self):
"""核心忙循环"""
while True:
# 1. 处理输入队列
self._process_input_queue()
# 2. 执行引擎步骤并返回输出
self._process_engine_step()

def _process_input_queue(self):
# 从队列取出请求并处理
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req) # 添加到 scheduler

def _process_engine_step(self) -> bool:
# 执行一步调度和推理
outputs, model_executed = self.step_fn()
# 将输出放入输出队列
for output in outputs.items() if outputs else ():
self.output_queue.put_nowait(output)
# 后处理钩子
self.post_step(model_executed)

3.2.4 输出发送流程(output_thread)

1
2
3
4
5
6
7
8
9
10
# 位置: core.py
# output_thread 从 output_queue 取出并发送
while True:
output = self.output_queue.get()
client_index, outputs = output
outputs.engine_index = engine_index

# msgpack 编码并通过 ZMQ 发送
buffers = encoder.encode_into(outputs, buffer)
tracker = sockets[client_index].send_multipart(buffers, copy=False, track=True)

3.2.5 端到端数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
AsyncMPClient (前端进程)                    EngineCoreProc (后台进程)
│ │
add_request_async() │
│ [msgpack 编码] │
▼ │
input_socket.send_multipart() ──[ZMQ IPC]──► input_socket.recv_multipart()


input_queue.put()


main_thread: input_queue.get()


EngineCore.add_request() → scheduler.add_request()


EngineCore.step() → scheduler.schedule() → model.execute()


output_queue.put(EngineCoreOutputs)


output_socket.recv_multipart() ◄──[ZMQ IPC]── output_socket.send_multipart()


outputs_queue.put()


get_output_async() → 返回给用户

这种三线程设计实现了 IO 与计算的完全解耦

  • input_thread 和 output_thread 负责网络 IO,不占用 GPU 计算资源
  • main_thread 专注于调度和模型推理,最大化 GPU 利用率
  • 队列缓冲消除了 IO 与计算之间的等待

3.3 step() - 单步推理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
"""调度、执行、生成输出"""

if not self.scheduler.has_requests():
return {}, False

# 1. 调度 - 决定哪些请求参与本次推理
scheduler_output = self.scheduler.schedule()

# 2. 执行模型前向传播 (异步)
future = self.model_executor.execute_model(scheduler_output, non_block=True)

# 3. 获取语法约束 bitmask (结构化输出)
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)

# 4. 等待模型执行完成
model_output = future.result()
if model_output is None:
model_output = self.model_executor.sample_tokens(grammar_output)

# 5. 更新调度器状态
engine_core_outputs = self.scheduler.update_from_output(scheduler_output, model_output)

return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0

四、Scheduler - 调度器

4.1 调度算法核心思想

vLLM V1 的调度器采用统一调度策略,没有明确的 “prefill 阶段” 和 “decode 阶段”:

每个请求只有 num_computed_tokens (已计算) 和 num_tokens_with_spec (需计算) 两个状态。调度器的目标是让每个请求的 num_computed_tokens 追上 num_tokens_with_spec

核心调度逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def schedule(self) -> SchedulerOutput:
# 1. 先调度 RUNNING 请求 (decode 阶段)
for request in self.running:
num_new_tokens = request.num_tokens_with_spec - request.num_computed_tokens
num_new_tokens = min(num_new_tokens, token_budget)

# 分配 KV Cache blocks
new_blocks = self.kv_cache_manager.allocate_slots(request, num_new_tokens, ...)

if new_blocks is None:
# KV Cache 不足,需要抢占低优先级请求
preempted_req = self.running.pop()
self._preempt_request(preempted_req)

# 2. 再调度 WAITING 请求 (prefill 阶段)
while self.waiting and token_budget > 0:
request = self.waiting.peek_request()
# 分配 KV Cache 并添加到 running 队列
...

4.2 KV Cache 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class KVCacheManager:
def allocate_slots(self, request, num_tokens, num_lookahead_tokens=0):
"""为请求分配 KV Cache blocks"""
# 计算需要的 block 数量
num_required_blocks = ceil((num_computed + num_tokens) / block_size)

# 尝试从前缀缓存命中
if self.enable_caching:
cached_blocks = self._get_cached_blocks(request)

# 分配新 blocks
new_blocks = self._allocate_new_blocks(num_required_blocks - len(cached_blocks))

return new_blocks

五、Executor - 模型执行器

5.1 Executor 类型

类型 场景 特点
UniProcExecutor 单 GPU 直接执行
MultiprocExecutor 多进程 (TP/PP) 多进程协调
RayDistributedExecutor Ray 分布式 Ray Actor 管理

5.2 execute_model 调用链

sequenceDiagram
    participant Scheduler as Scheduler
    participant Executor as Executor
    participant Worker as Worker
    participant ModelRunner as ModelRunner
    participant Model as Model

    Scheduler->>Executor: execute_model(scheduler_output)
    Executor->>Worker: collective_rpc("execute_model", args)
    Worker->>ModelRunner: execute_model(scheduler_output)
    ModelRunner->>Model: forward(input_ids, positions, kv_caches, ...)
    Model-->>ModelRunner: hidden_states
    ModelRunner->>ModelRunner: sample_tokens(hidden_states)
    ModelRunner-->>Worker: ModelRunnerOutput
    Worker-->>Executor: ModelRunnerOutput
    Executor-->>Scheduler: ModelRunnerOutput

六、Worker - GPU 工作进程

6.1 Worker 初始化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Worker(WorkerBase):
def init_device(self):
# 1. 设置 CUDA 设备
self.device = torch.device(f"cuda:{self.local_rank}")
current_platform.set_device(self.device)

# 2. 初始化分布式环境
init_worker_distributed_environment(...)

# 3. 创建 ModelRunner
self.model_runner = GPUModelRunner(self.vllm_config, self.device)

def load_model(self):
# 加载模型权重
with self._maybe_get_memory_pool_context(tag="weights"):
self.model_runner.load_model()

def compile_or_warm_up_model(self):
# 编译/预热模型 (CUDA Graph 捕获)
if not self.model_config.enforce_eager:
self.model_runner.capture_model()

6.2 execute_model 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@torch.inference_mode()
def execute_model(self, scheduler_output):
intermediate_tensors = None

# Pipeline 并行:接收上一阶段的中间张量
if not get_pp_group().is_first_rank:
tensor_dict = get_pp_group().recv_tensor_dict(...)
intermediate_tensors = IntermediateTensors(tensor_dict)

# 执行模型前向传播
output = self.model_runner.execute_model(scheduler_output, intermediate_tensors)

# 如果不是最后一个 PP rank,发送中间张量
if not get_pp_group().is_last_rank:
get_pp_group().send_tensor_dict(output.tensors, ...)
return None

return output

七、模型加载流程

flowchart TB
    subgraph Loading["模型加载流程"]
        A[Executor._init_executor] --> B[Worker.init_device]
        B --> C[Worker.load_model]
        C --> D[ModelRunner.load_model]
        D --> E[determine_available_memory]
        E --> F[profile_run]
        F --> G[initialize_kv_cache]
        G --> H[compile_or_warm_up_model]
        H --> I[capture_model - CUDA Graph]
    end

关键步骤:

  1. 初始化设备:设置 CUDA 设备,初始化 NCCL 分布式环境
  2. 加载模型:使用 safetensors/HuggingFace 格式加载权重
  3. 内存 Profiling:执行 profile_run 确定可用于 KV Cache 的内存
  4. 初始化 KV Cache:根据可用内存分配 KV Cache blocks
  5. 模型编译/预热:捕获 CUDA Graph 优化推理性能

八、输出处理流程

8.1 OutputProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class OutputProcessor:
def process_outputs(self, engine_core_outputs, ...):
"""处理 EngineCoreOutputs 为 RequestOutput"""

for engine_core_output in engine_core_outputs:
req_state = self.request_states.get(req_id)

# 1. 更新统计信息
self._update_stats_from_output(req_state, engine_core_output, ...)

# 2. Detokenize
if req_state.detokenizer:
stop_string = req_state.detokenizer.update(new_token_ids, ...)

# 3. 计算 logprobs
if req_state.logprobs_processor:
req_state.logprobs_processor.update_from_output(engine_core_output)

# 4. 创建 RequestOutput 并放入队列
if request_output := req_state.make_request_output(...):
if req_state.queue is not None:
# AsyncLLM: 放入队列供 generate() 消费
req_state.queue.put(request_output)

8.2 RequestOutputCollector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class RequestOutputCollector:
"""收集流式 RequestOutput,实现生产者-消费者模式"""

def put(self, output):
"""非阻塞 put"""
if self.output is None:
self.output = output
self.ready.set()
elif isinstance(self.output, RequestOutput):
# 增量合并输出
self.output.add(output, aggregate=self.aggregate)

async def get(self):
"""阻塞 get"""
while self.output is None:
await self.ready.wait()
output = self.output
self.output = None
self.ready.clear()
return output

九、关键性能优化

9.1 异步化设计

  1. ZMQ 异步通信:前后端进程通过 ZMQ 非阻塞通信
  2. 批处理队列:Pipeline 并行时使用批处理队列消除 bubble
  3. 非阻塞输出获取: 优先,避免任务切换开销

9.2 内存优化

  1. Paged Attention:KV Cache 分块管理,动态分配
  2. Prefix Caching:前缀缓存复用,减少重复计算
  3. CUDA Graph:捕获执行图,减少 kernel launch 开销
  4. Sleep Mode:支持模型权重卸载,节省 GPU 内存

9.3 调度优化

  1. Continuous Batching:连续批处理,最大化 GPU 利用率
  2. Chunked Prefill:大 prompt 分块处理,避免 decode 请求饥饿
  3. Speculative Decoding:投机解码,加速生成

十、数据流总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
HTTP Request


┌─────────────────────────────────────────────────────────────────┐
│ API Server Process │
│ ┌────────────┐ ┌────────────────┐ ┌────────────────────────┐ │
│ │ AsyncLLM │─▶│ InputProcessor │─▶│ EngineCoreClient(ZMQ) │ │
│ └────────────┘ └────────────────┘ └──────────┬─────────────┘ │
│ ▲ │ │
│ │ ┌────────────────┐ │ │
│ └─────────│OutputProcessor │◀─────────────┼───────────────│
│ └────────────────┘ │ │
└──────────────────────────────────────────────────┼───────────────┘
│ ZMQ IPC
┌──────────────────────────────────────────────────┼───────────────┐
│ EngineCore Process │ │
│ ┌─────────────┐ ┌───────────┐ ┌─────────────┐│ │
│ │ EngineCoreProc│◀─┤ Scheduler │◀─┤ Executor ││◀──────────────│
│ └─────────────┘ └───────────┘ └─────┬───────┘│ │
│ │ │ │
│ ▼ │ │
│ ┌────────────────────────────┐ │ │
│ │ Worker (GPU Process) │ │ │
│ │ ┌────────────────────────┐│ │ │
│ │ │ ModelRunner ││ │ │
│ │ │ ┌──────────────────┐ ││ │ │
│ │ │ │ Model (forward) │ ││ │ │
│ │ │ └──────────────────┘ ││ │ │
│ │ └────────────────────────┘│ │ │
│ └────────────────────────────┘ │ │
└──────────────────────────────────────────────────────────────────┘

这就是 vLLM V1 从 AsyncLLM开始的完整调用链分析。核心设计思想是异步化解耦

  • 前端 AsyncLLM 负责请求处理和输出流管理
  • 后端 EngineCore 负责调度和模型执行
  • 两者通过 ZMQ 解耦,实现高效的多进程协作