vllm分析(五)——pd分离kv cache的处理过程

张开发
2026/6/15 2:43:43 15 分钟阅读

分享文章

vllm分析(五)——pd分离kv cache的处理过程
get_num_new_matched_tokensScheduler 通过 get_num_new_matched_tokens 这一查询接口获知远程 KV 缓存的存在整个过程涉及调度决策、状态管理与异步加载。┌─────────────────────────────────────────────────────────────────┐ │ Scheduler.schedule() │ │ │ │ 遍历 waiting / skipped_waiting 队列 │ │ for each request: │ │ if request.num_computed_tokens 0: │ │ # 1. 获取本地缓存命中 │ │ new_computed_blocks, num_local │ │ kv_cache_manager.get_computed_blocks(request) │ │ # 2. 查询远程缓存 │ │ ext_tokens, async_flag │ │ connector.get_num_new_matched_tokens( │ │ request, num_local) │ │ │ │ if ext_tokens is None: │ │ # 连接器暂无法确定 → 跳过此请求下次再查 │ │ queue.pop(); step_skipped.prepend(request) │ │ continue │ │ │ │ if ext_tokens 0: │ │ # 远程存在 KV 缓存 │ │ num_external_computed_tokens ext_tokens │ │ load_kv_async True │ │ else: │ │ # 远程无缓存 │ │ num_external_computed_tokens 0 │ │ load_kv_async False │ └─────────────────────────────────────────────────────────────────┘ │ ┌─────────────────────┴─────────────────────┐ │ │ ▼ (存在远程 KV) ▼ (不存在) ┌───────────────────────────────┐ ┌───────────────────────────┐ │ 异步加载路径 │ │ 正常调度本地计算 │ │ │ │ │ │ # 分配 KV blocks占位 │ │ 分配 slots计算 token │ │ new_blocks allocate_slots( │ │ 请求进入 RUNNING │ │ delay_cache_blocksTrue) │ └───────────────────────────┘ │ │ │ # 通知连接器分配信息 │ │ connector.update_state_after_alloc(...) │ │ │ # 请求状态转为等待远程 KV │ │ request.status WAITING_FOR_REMOTE_KVS │ │ │ # 记录已分配 blocks 但暂不缓存 │ │ request.num_computed_tokens │ │ num_local ext_tokens │ │ │ │ # 当前 step 不分配计算 token │ │ num_new_tokens 0 │ └───────────────────────────────┘delay_cache_blocksTrue 使得分配出的 blocks 虽然已从空闲池中取出、引用计数增加但不会被写入前缀缓存哈希表即未标记为全局可复用的缓存。这些 blocks 处于“已预留、但未正式缓存”的状态。Worker和Scheduler的交互当 Worker 侧完成异步加载会通过 KVConnectorOutput.finished_recving 上报。Worker 侧 Scheduler 侧 KVCacheManager │ │ │ │ KV加载完成 │ │ │ (finished_recving) │ │ │──────────────────────────│ │ │ │ _update_from_kv_xfer_finished│ │ │ └─ 加入 finished_recving │ │ │ _kv_req_ids │ │ │ │ │ │ 下一轮 schedule() │ │ │ └─ _try_promote... │ │ │ ├─ 检查 ID 是否在集合中 │ │ │ └─ _update_waiting... │ │ │ │ │ │ │ └─ cache_blocks() ──│ │ │ │ 注册到前缀缓存 │ │ 状态变为 WAITING │ │ │ │ │ │ 继续分配新 tokens │ │ │ └─ allocate_slots() ─────────│ 返回已有 blocks │ │ │ │ │ 生成 SchedulerOutput │ │──────────────────────────│ │ │ 执行推理使用填充好的 KV│ │接收 Worker 侧完成信号┌─────────────────────────────────────────────────────────────┐ │ GPUModelRunner.execute_model() │ │ │ │ │ ▼ │ │ 返回 ModelRunnerOutput │ │ └─ kv_connector_output (KVConnectorOutput) │ │ │ │ │ ▼ │ │ Scheduler.update_from_output() │ │ │ │ │ ▼ │ │ _update_from_kv_xfer_finished(kv_connector_output) │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ _update_from_kv_xfer_finished 处理逻辑 │ ├─────────────────────────────────────────────────────────────┤ │ for req_id in kv_connector_output.finished_recving: │ │ req self.requests[req_id] │ │ if req.status WAITING_FOR_REMOTE_KVS: │ │ self.finished_recving_kv_req_ids.add(req_id) │ │ else: │ │ # 请求已结束直接释放 blocks │ │ self._free_blocks(req) │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────┐ │ finished_recving_kv │ │ _req_ids 集合 │ │ (存储已完成接收的ID) │ └──────────────────────┘v1定义的GPUModelRunner.execute_modelScheduler.update_from_output_try_promote_blocked_waiting_request┌─────────────────────────────────────────────────────────────┐ │ Scheduler.schedule() │ │ │ │ 遍历等待队列 (waiting / skipped_waiting) │ │ while token_budget0 and 有空闲槽位: │ │ request queue.peek_request() │ │ │ │ if _is_blocked_waiting_status(request.status): │ │ # 尝试提拔阻塞状态的请求 │ │ if not _try_promote_blocked_waiting_request(): │ │ # 提拔失败跳过本次调度 │ │ continue │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ _try_promote_blocked_waiting_request(request) │ ├─────────────────────────────────────────────────────────────┤ │ if request.status WAITING_FOR_REMOTE_KVS: │ │ # 检查是否已完成接收 │ │ if req_id not in finished_recving_kv_req_ids: │ │ return False # 未完成不可调度 │ │ │ │ # 完成接收后的处理 │ │ _update_waiting_for_remote_kv(request) │ │ │ │ # 状态转换 │ │ if request.num_preemptions: │ │ request.status PREEMPTED │ │ else: │ │ request.status WAITING │ │ return True │ └─────────────────────────────────────────────────────────────┘_update_waiting_for_remote_kv_update_waiting_for_remote_kv┌─────────────────────────────────────────────────────────────┐ │ _update_waiting_for_remote_kv(request) │ ├─────────────────────────────────────────────────────────────┤ │ if req_id in failed_recving_kv_req_ids: │ │ # 部分块加载失败 │ │ if request.num_computed_tokens 0: │ │ # 缓存有效的 tokens │ │ kv_cache_manager.cache_blocks(request, │ │ request.num_computed_tokens) │ │ else: │ │ # 完全失败释放所有分配块 │ │ kv_cache_manager.free(request) │ │ failed_recving_kv_req_ids.remove(req_id) │ │ else: │ │ # 全部成功 │ │ # 正式将 blocks 注册到前缀缓存若启用 │ │ kv_cache_manager.cache_blocks(request, │ │ request.num_computed_tokens) │ │ │ │ # 全提示命中的特殊处理需重算最后一个 token │ │ if request.num_computed_tokens request.num_tokens: │ │ request.num_computed_tokens request.num_tokens-1 │ │ │ │ finished_recving_kv_req_ids.remove(req_id) │ └─────────────────────────────────────────────────────────────┘请求重新进入正常调度流程┌─────────────────────────────────────────────────────────────┐ │ 请求状态变为 WAITING或 PREEMPTED后继续当前 schedule() │ │ 循环 │ ├─────────────────────────────────────────────────────────────┤ │ # 请求现在可以被正常调度 │ │ # 分配新的 tokens如果需要 │ │ num_new_tokens min(request.remaining_tokens, token_budget)│ │ │ │ # KVCacheManager 会复用之前已分配的 blocks │ │ # 因为请求已持有 blocks │ │ new_blocks kv_cache_manager.allocate_slots(...) │ │ # 返回的是已存在的 KVCacheBlocks 对象 │ │ │ │ # 将请求移入 running 队列 │ │ self.running.append(request) │ │ request.status RUNNING │ │ │ │ # 记录到 scheduler_output发送给 Worker 执行 │ │ req_to_new_blocks[req_id] new_blocks │ │ num_scheduled_tokens[req_id] num_new_tokens │ └─────────────────────────────────────────────────────────────┘GPUModelRunner中kv cache的加载流程┌────────────────────────────────────────────────────────────────────┐ │ GPUModelRunner.execute_model │ └────────────────────────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────────┐ │ pre_forward(): │ │ - bind_connector_metadata(meta) │ │ - start_load_kv(forward_context) │ │ │ │ │ └─── 为每个请求创建异步加载任务但不等待 │ │ 数据传输在后台进行 │ └────────────────────────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────────┐ │ 模型执行 (逐层) │ │ for each layer: │ │ ┌─────────────────────────────────────────┐ │ │ │ maybe_transfer_kv_layer │ │ │ │ - wait_for_layer_load(layer_name) ◄───┼─ ─ ─ 阻塞等待该层数据│ │ │ - attention_forward(...) │ │ │ │ - save_kv_layer(...) │ │ │ └─────────────────────────────────────────┘ │ └────────────────────────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────────┐ │ post_forward(): │ │ - wait_for_save() # 等待所有保存完成 │ │ - get_finished(finished_req_ids) │ │ │ │ │ └─── 返回 (finished_sending, finished_recving) │ └────────────────────────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────────┐ │ 返回 ModelRunnerOutput 给 Scheduler │ │ - kv_connector_output.finished_recving │ │ - kv_connector_output.finished_sending │ └────────────────────────────────────────────────────────────────────┘在很多connector的实现中wait_for_layer_load为空。kv cache是否加载完成通过get_finished判断。maybe_transfer_kv_layer# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/model_executor/layers/attention/kv_transfer_utils.py#L15defmaybe_transfer_kv_layer(func:Callable)-Callable:Decorator that handles KV layer transfer prior and after execution of an attention layer, if enabled. Otherwise, the wrapper is a no-op. On entry: waits for the KV layer from the connector. On exit: saves the KV layer to the connector. # Import at runtime to avoid circular dependencyfromvllm.model_executor.layers.attention.attentionimportget_attention_context# Inspect the signature ONCE when the decorator is applied.siginspect.signature(func)param_nameslist(sig.parameters.keys())# Find the index of layer_name parameter.try:layer_name_indexparam_names.index(layer_name)exceptValueErrorase:raiseTypeError(fFunction{func.__name__}must have a layer_name parameter)fromewraps(func)defwrapper(*args,**kwargs):ifnothas_kv_transfer_group()ornotis_v1_kv_transfer_group():returnfunc(*args,**kwargs)layer_name_resolve_layer_name(args[layer_name_index])# Extract attention context (metadata, layer, kv_cache, layer_slot_mapping)attn_metadata,_,kv_cache,_get_attention_context(layer_name)connectorget_kv_transfer_group()ifattn_metadataisNoneornotconnector.has_connector_metadata():returnfunc(*args,**kwargs)# Wait for KV layer on entryconnector.wait_for_layer_load(layer_name)# Execute the functionresultfunc(*args,**kwargs)# Save KV cache layer on exitconnector.save_kv_layer(layer_name,kv_cache,attn_metadata)returnresultreturnwrapperpre_forward和post_forward的代码# https://github.com/vllm-project/vllm/blob/v0.20.1/vllm/v1/worker/gpu/kv_connector.py#L48classActiveKVConnector(KVConnector):def__init__(self,vllm_config:VllmConfig,kv_caches_dict:dict[str,torch.Tensor]):self.vllm_configvllm_config self.kv_connectorget_kv_transfer_group()# Register kv caches with KV Connector if applicable.# TODO: support cross_layers_kv_cache# (see https://github.com/vllm-project/vllm/pull/27743)self.kv_connector.register_kv_caches(kv_caches_dict)self.kv_connector.set_host_xfer_buffer_ops(copy_kv_blocks)self._disabledFalsedefpre_forward(self,scheduler_output:SchedulerOutput)-None:ifself._disabled:returnkv_connector_metadatascheduler_output.kv_connector_metadataassertkv_connector_metadataisnotNoneself.kv_connector.bind_connector_metadata(kv_connector_metadata)self.kv_connector.handle_preemptions(kv_connector_metadata)# TODO: sort out KV Connectors use of forward_contextifis_forward_context_available():self.kv_connector.start_load_kv(get_forward_context())else:withset_forward_context(None,self.vllm_config):self.kv_connector.start_load_kv(get_forward_context())defpost_forward(self,scheduler_output:SchedulerOutput,wait_for_save:boolTrue,clear_metadata:boolTrue,)-KVConnectorOutput|None:ifself._disabled:returnNoneoutputKVConnectorOutput()ifwait_for_save:self.kv_connector.wait_for_save()output.finished_sending,output.finished_recving(self.kv_connector.get_finished(scheduler_output.finished_req_ids))output.invalid_block_idsself.kv_connector.get_block_ids_with_load_errors()output.kv_connector_statsself.kv_connector.get_kv_connector_stats()output.kv_cache_eventsself.kv_connector.get_kv_connector_kv_cache_events()output.kv_connector_worker_meta(self.kv_connector.build_connector_worker_meta())ifclear_metadata:self.kv_connector.clear_connector_metadata()returnoutputdefno_forward(self,scheduler_output:SchedulerOutput)-ModelRunnerOutput:ifself._disabled:returnEMPTY_MODEL_RUNNER_OUTPUT self.pre_forward(scheduler_output)kv_connector_outputself.post_forward(scheduler_output,wait_for_saveFalse)ifkv_connector_outputisNoneorkv_connector_output.is_empty():returnEMPTY_MODEL_RUNNER_OUTPUT outputcopy.copy(EMPTY_MODEL_RUNNER_OUTPUT)output.kv_connector_outputkv_connector_outputreturnoutputdefset_disabled(self,disabled:bool)-None:# Ensure that layer-wise connector hooks arent called when disabled.kv_transfer_state._KV_CONNECTOR_AGENTNoneifdisabledelseself.kv_connector self._disableddisabled NO_OP_KV_CONNECTORKVConnector()defget_kv_connector(vllm_config:VllmConfig,kv_caches_dict:dict[str,torch.Tensor])-KVConnector:ifnothas_kv_transfer_group():# No-op connector.returnNO_OP_KV_CONNECTORreturnActiveKVConnector(vllm_config,kv_caches_dict)kv cache传输模式vllm定义可各种connector可以链接不同cache后端比如LMCache Mooncakekv_connector/v1文件夹下有各种connector的实现。LMCacheConnectorV1LMCacheMPConnectorFlexKVConnectorV1MooncakeConnector目前KVc cache的传输主要有两种模式中心存储和分布式。LMCache和Mooncake就是一种中心式存储的模式。分布式使用P2P传递数据。各个实例分别管理自己的存储比如一个P实例计算完成后向目标D实例建立通信完成KV值传递图片来源vLLM PD分离KV cache传递机制详解与演进分析[1]。p2p模式P2pNcclConnectorP2pNcclEnginePrefill 端生产者KV 发送流程┌─────────────────────────────────────────────────────────────────────┐ │ Scheduler (调度器) │ │ - 为请求分配 KV blocks (allocate_slots) │ │ - 生成 P2pNcclConnectorMetadata (含 request_id, block_ids) │ │ - 通过 SchedulerOutput 发送给 Worker │ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ GPUModelRunner (Prefill 实例) │ │ execute_model(): │ │ 1. pre_forward → bind_connector_metadata │ │ 2. 逐层执行 Attention │ └─────────────────────────────────────────────────────────────────────┘ │ ┌───────────────┴───────────────┐ │ 每个 Attention 层计算完成后 │ ▼ │ ┌─────────────────────────────────────────────────────────────────────┐ │ maybe_transfer_kv_layer 装饰器调用 save_kv_layer(layer_name, ...) │ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ P2pNcclConnector.save_kv_layer() │ │ - 仅当 is_producer True 时执行 │ │ - 遍历 connector_metadata.requests │ │ - 对每个请求: │ │ 1. extract_kv_from_layer(kv_layer, block_ids) → kv_cache切片 │ │ 2. 解析 request_id → 获取 Decode 端 IP 和端口 │ │ 3. 构造 tensor_id request_id # layer_name │ │ 4. p2p_nccl_engine.send_tensor(tensor_id, kv_cache, remote_addr)│ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ P2pNcclEngine.send_tensor()默认 PUT_ASYNC │ │ - 将 (tensor_id, remote_address, tensor) 放入 send_queue │ │ - 立即返回 (不阻塞) │ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ 后台线程 _send_thread 消费队列 │ │ for each item in send_queue: │ │ 1. 通过 ZMQ DEALER 向 remote_address 发送 PUT 命令 │ │ (包含 shape, dtype, tensor_id) │ │ 2. 等待对端回复 b0 (表示已准备好接收) │ │ 3. 调用 ncclSend() 启动 NCCL 异步传输 │ │ 4. 记录 tensor_id 到 send_request_id_to_tensor_ids │ │ 5. 继续下一个 item │ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ 请求完成时 │ │ Scheduler 调用 connector.request_finished(request, block_ids) │ │ → P2pNcclConnector.request_finished() 返回 (False, None) │ │ → 表示不需要延迟释放 blocks │ │ ⚠️ 风险: 异步 NCCL 传输可能尚未完成但 blocks 会被立即释放 │ │ 可能导致 use-after-free │ └─────────────────────────────────────────────────────────────────────┘Decode 端消费者KV 接收流程┌─────────────────────────────────────────────────────────────────────────┐ │ Decode Worker 后台线程listen_for_requests │ │ (持续监听 ZMQ ROUTER 套接字接收 Prefill 发来的 PUT 命令) │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 收到 PUT 命令包含 tensor_id, shape, dtype │ │ - remote_address 为 Prefill 端地址 │ │ - tensor_id 格式: request_id#layer_name │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 1. 分配临时接收缓冲区 │ │ tensor torch.empty( │ │ data[shape], │ │ dtypegetattr(torch, data[dtype]), │ │ deviceself.device) # 在当前 GPU 上分配 │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 2. 发送确认信号表示已准备好接收 │ │ self.router_socket.send_multipart([remote_address, b0]) │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 3. 使用 NCCL 接收数据 │ │ comm, rank self.comms[remote_address.decode()] │ │ self.recv(comm, tensor, rank ^ 1, self.recv_stream) │ │ - 阻塞直到 tensor 被完全填充 │ │ - 数据直接从远端 GPU 内存传输到本地的 tensor 中 │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 4. 将接收到的 tensor 存入 recv_store并通知等待者 │ │ with self.recv_store_cv: │ │ self.recv_store[tensor_id] tensor │ │ self.recv_store_cv.notify() # 唤醒 recv_tensor │ └─────────────────────────────────────────────────────────────────────────┘接收缓冲区后台线程 listen_for_requests 收到 PUT 命令后会调用 torch.empty() 分配一个临时的接收缓冲区与发送端 shape/dtype 一致位于当前 GPU。然后通过 ncclRecv 将远程的 KV 数据直接接收到这个临时缓冲区中。缓冲区填充完成后将其存入 recv_store并通知等待的 recv_tensor。start_load_kv 中的 inject_kv_into_layer 再将这个临时缓冲区的内容 复制到预分配的 KV blocks 中即 vLLM 的 paged KV cache。start_load_kv的处理过程start_load_kv┌─────────────────────────────────────────────────────────────────────────┐ │ Decode Workerstart_load_kv │ │ (在模型执行前的 pre_forward 中调用同步等待每一层数据) │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 1. 调用 p2p_nccl_engine.recv_tensor(tensor_id, remote_address) │ │ → 阻塞等待 recv_store 中出现该 tensor_id由后台线程填充 │ │ → 获取到临时 tensor 后返回 │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 2. 调用 inject_kv_into_layer(layer, kv_cache, block_ids, request_id) │ │ - 根据 Attention 后端布局MLA/FlashInfer/FlashAttention │ │ - 使用 block_ids 将临时 tensor 中的 KV 数据写入本地预分配的 blocks │ │ │ │ 示例FlashAttention 布局 │ │ layer[:, block_ids, ...] kv_cache │ │ 示例MLA / FlashInfer 布局 │ │ layer[block_ids, ...] kv_cache │ │ │ │ - 此时 KV 数据已正式进入 vLLM 的分页 KV 缓存 │ │ - 临时 tensor 后续会被释放引用计数减少或 pool.free │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 3. 该层接收完成继续下一层或结束若所有层已处理 │ └─────────────────────────────────────────────────────────────────────────┘reference[1] vLLM PD分离KV cache传递机制详解与演进分析[2] vLLM PD分离方案浅析[3] P2P NCCL 连接器

更多文章