LMCache Deep Dive

Complete source code architecture analysis of LMCache v1 -- the multi-tier KV cache engine for LLM inference. Every code snippet is extracted directly from the repository.

Source: lmcache/v1/ chunk_size = 256 tokens 4-Tier Storage vLLM + SGLang LRU / LFU / FIFO / MRU
← Back to AI Infra Overview

1 LMCacheEngine: The Central Orchestrator

Class Hierarchy v1/cache_engine.py

LMCacheEngine is the main entry point for all KV cache operations. It owns the StorageManager, a TokenDatabase, and a GPUConnectorInterface. The three core APIs -- lookup, retrieve, and store -- form the entire external interface.

class LMCacheEngine:
    """The main class for the cache engine.
    When storing the KV caches into the cache engine, it takes GPU KV
    caches from the serving engine and convert them into MemoryObjs that
    resides in the CPU. The MemoryObjs are then being stored into the
    StorageBackends in an asynchronous manner.
    """

    def __init__(
        self,
        config: LMCacheEngineConfig,
        metadata: LMCacheMetadata,
        token_database: TokenDatabase,
        gpu_connector: Optional[GPUConnectorInterface],
        broadcast_fn: Callable[[torch.Tensor, int], None],
        broadcast_object_fn: Callable[[Any, int], Any],
    ):
        self.config = config
        self.token_database = token_database
        self.gpu_connector = gpu_connector
        self.storage_manager: Optional[StorageManager] = None
        self.use_layerwise = config.use_layerwise
        self.num_layers = metadata.kv_shape[0]
        self.remove_after_retrieve = config.enable_pd and config.pd_role == "receiver"

1.1 lookup() -- Prefix-Match Cache Probe

The lookup() method checks how many prefix tokens already exist in the cache. It chunks tokens, generates content-addressable keys, and calls batched_contains() on the storage backends. Returns the count of cached prefix tokens.

@_lmcache_nvtx_annotate
def lookup(
    self,
    tokens: Optional[Union[torch.Tensor, list[int]]] = None,
    hashes: Optional[List[int]] = None,
    offsets: Optional[List[int]] = None,
    search_range: Optional[List[str]] = None,
    lookup_id: Optional[str] = None,
    pin: bool = False,
    request_configs: Optional[dict] = None,
) -> int:
    """Checks the existence of KV cache prefix tokens.
    :return: An int indicating how many prefix tokens exist inside LMCache."""

    # Health check: block operation if LMCache is unhealthy
    if not self.is_healthy():
        return 0

    chunk_info_iterator = self.token_database.process_tokens(
        tokens=tokens, hashes=hashes, offsets=offsets,
        request_configs=request_configs,
    )

    # Collect all keys then batch-check against storage backends
    keys = []
    for chunk_info in chunk_info_iterator:
        keys.append(chunk_info[2])  # (start, end, key)

    # Prefix-match: hit_chunks is the count of contiguous prefix hits
    hit_chunks, block_mapping = self.storage_manager.batched_contains(
        keys, search_range, pin
    )

    res = 0
    for idx, (start, end, key) in enumerate(chunk_info_list):
        if idx < hit_chunks:
            res = end
            continue
        return res

1.2 store() -- GPU-to-CPU KV Offload

The store() method chunks tokens, allocates MemoryObjs from the allocator backend, copies KV data from GPU via the GPUConnector, then submits to storage backends asynchronously.

@_lmcache_nvtx_annotate
@torch.inference_mode()
def store(self, tokens, hashes, offsets, mask, **kwargs) -> None:
    # Step 1: Chunk tokens and generate content-addressable keys
    for start, end, key in self.token_database.process_tokens(tokens, hashes, offsets, mask):
        # Step 2: Allocate CPU memory from the allocator backend
        memory_obj = self.storage_manager.allocate(kv_shapes, kv_dtypes, fmt=self.fmt)
        if memory_obj is None:
            logger.warning("Local cpu memory under pressure, partial store")
            break
        keys.append(key)
        memory_objs.append(memory_obj)

    # Step 3: Batch copy from GPU to CPU memory objects
    self.gpu_connector.batched_from_gpu(memory_objs, starts, ends, **kwargs)

    # Step 4: Submit to all storage backends (async for remote/disk)
    self.storage_manager.batched_put(keys, memory_objs, location=self.store_location)

1.3 retrieve() -- Cache-to-GPU KV Onload

The retrieve() method fetches MemoryObjs from storage backends and pushes them to GPU via the GPUConnector. Returns a boolean mask of which tokens were successfully retrieved.

@_lmcache_nvtx_annotate
@torch.inference_mode()
def retrieve(self, tokens, mask, **kwargs) -> torch.Tensor:
    """Retrieve KV caches and put them into the serving engine.
    :return: boolean mask -- True where tokens are retrieved."""

    ret_mask = torch.zeros(len(tokens), dtype=torch.bool, device="cpu")
    reordered_chunks = self._process_tokens_internal(tokens, mask, ret_mask, **kwargs)

    # Batch copy from CPU memory objects to GPU KV cache
    _, memory_objs, starts, ends = zip(*reordered_chunks)
    self.gpu_connector.batched_to_gpu(list(memory_objs), list(starts), list(ends), **kwargs)

    # Optionally remove after retrieve (PD disaggregation receiver)
    for key, memory_obj, _, _ in reordered_chunks:
        if self.remove_after_retrieve:
            self.storage_manager.remove(key, self.retrieve_locations)
        memory_obj.ref_count_down()

    return ret_mask

2 Token Chunking: Sequences to 256-Token Chunks

Token Chunking and Rolling Hash Flow
Input Token Sequence: [t0, t1, t2, ... t511, t512, ... t700] _chunk_tokens(): Split into chunk_size=256 blocks Chunk 0: [t0..t255] Chunk 1: [t256..t511] Chunk 2: [t512..t700] (partial) H0 = hash(NONE, [t0..t255], ()) H1 = hash(H0, [t256..t511], ()) H2 = hash(H1, [t512..t700], ()) Each hash is a rolling prefix hash: hash(prev_hash, token_chunk, extra_keys)
ChunkedTokenDatabase v1/token_database.py:269
class ChunkedTokenDatabase(TokenDatabase):
    def __init__(self, config, metadata):
        self.chunk_size = config.chunk_size  # default: 256

    def _chunk_tokens(self, tokens):
        """Chunk tokens into blocks of size chunk_size."""
        save_unfull_chunk = self.config.save_unfull_chunk if self.config else True
        end = len(tokens) if save_unfull_chunk else (len(tokens) - len(tokens) % self.chunk_size)
        for i in range(0, end, self.chunk_size):
            yield tokens[i : i + self.chunk_size]

    def _prefix_hash(self, token_chunks):
        """Rolling prefix hash: each chunk hash depends on the previous."""
        prefix_hash = self._get_init_hash()  # NONE_HASH (typically 0)
        for token_chunk in token_chunks:
            prefix_hash = self._hash_tokens(token_chunk, prefix_hash)
            yield prefix_hash
Rolling Hash Design

Each chunk's hash incorporates the previous chunk's hash as a prefix. This means the hash for chunk N encodes the entire prefix [0..N*256], enabling content-addressable deduplication -- two requests sharing the same system prompt will have identical chunk hashes for overlapping prefixes.

# The canonical hash computation from TokenDatabase base class
def _hash_tokens(self, tokens, prefix_hash=None, extra_keys=None):
    tokens_tuple = tuple(tokens.cpu().tolist()) if isinstance(tokens, torch.Tensor) else tuple(tokens)

    # Canonicalize inputs for consistent cross-process hashing
    canon_prefix = prefix_hash if prefix_hash is not None else NONE_HASH
    canon_tokens = tokens_tuple
    canon_extra = tuple(extra_keys) if extra_keys else ()

    return self.hash_func((canon_prefix, canon_tokens, canon_extra))

2.1 process_tokens() -- Full Pipeline

def process_tokens(self, tokens, hashes, offsets, mask, make_key=True, request_configs=None):
    # Mask validation: false count must be multiple of chunk_size
    num_falses = mask.numel() - mask.long().sum().item() if mask else 0
    if num_falses % self.chunk_size != 0:
        raise ValueError("Falses in mask must be multiple of chunk_size")

    if tokens is not None:
        token_chunks = self._chunk_tokens(tokens)
        prefix_hashes = self._prefix_hash(token_chunks)
        for chunk_id, hash_val in enumerate(prefix_hashes):
            start_idx = chunk_id * self.chunk_size
            end_idx = min(start_idx + self.chunk_size, len(tokens))
            if start_idx < num_falses:
                continue  # Skip masked prefix chunks
            yield (start_idx, end_idx, self._make_key_by_hash(hash_val, request_configs))

3 Content-Addressable Keys: CacheEngineKey

CacheEngineKey Structure lmcache/utils.py:340
@dataclass
class CacheEngineKey:
    model_name: str         # e.g., "meta-llama/Llama-3-8B"
    world_size: int         # tensor parallel size
    worker_id: int          # which TP rank
    chunk_hash: int         # rolling prefix hash of this chunk
    dtype: torch.dtype       # e.g., torch.float16
    request_configs: Optional[dict]  # per-request tags (LoRA ID, etc.)
    tags: Optional[tuple]    # derived from request_configs["lmcache.tag.*"]

    def __hash__(self):
        return hash((
            self.model_name, self.world_size, self.worker_id,
            self.chunk_hash, self._dtype_str, self.tags,
        ))

    def to_string(self):
        # Format: "model@worldsize@workerid@chunkhash_hex@dtype[@tag1%val1@...]"
        s = f"{self.model_name}@{self.world_size}@{self.worker_id}@{self.chunk_hash_hex}@{self._dtype_str}"
        if self.tags:
            tags = [f"{k}%{v}" for k, v in self.tags]
            s += "@" + "@".join(tags)
        return s
FieldPurposeExample Value
model_namePrevents cross-model cache collisionsmeta-llama/Llama-3-8B
world_sizeTP-aware: different shard counts produce different KVs4
worker_idWhich TP rank's KV shard this belongs to0
chunk_hashRolling prefix hash -- content-addressable0x3a7f... (64-bit)
dtypePrevents mixing float16/bfloat16 cachestorch.bfloat16
tagsPer-request metadata (LoRA, custom tags)(("lora", "adapter1"),)
LayerCacheEngineKey for Layerwise Mode

When use_layerwise=True, each chunk key is split into per-layer keys via key.split_layers(num_layers), producing LayerCacheEngineKey objects with an additional layer_id field. This allows independent per-layer storage and retrieval.

4 Multi-Tier Storage Architecture

Multi-Tier Storage Architecture: L1 CPU DRAM / L2 Disk / L3 Remote / L4 P2P
GPU KV Cache (vLLM / SGLang paged buffer) GPUConnector L1: LocalCPUBackend Pinned CPU DRAM -- PagedTensorMemoryAllocator Default: 5 GB | LRU eviction | Synchronous put async spill L2: LocalDiskBackend NVMe / SSD with optional O_DIRECT AsyncPQThreadPoolExecutor | Priority queue (prefetch > put) serde + network L3: RemoteBackend Redis / Valkey / S3 / InfiniStore / Mooncake / LMDB Serializer: naive / CacheGen / KiVi | Auto-reconnect with backoff L4: P2PBackend GPU-to-GPU via ZMQ + TransferChannel RDMA / NVLink direct memory access PDBackend Prefill/Decode disagg StorageManager Orchestrates all tiers WeightedSemaphore AsyncMultiSerializer Tier promotion on get() Freeze mode Backend bypass Event-based prefetch Latency: L1 ~microseconds | L2 ~100us-1ms | L3 ~1-10ms | L4 ~10-100us (RDMA)

4.1 L1: LocalCPUBackend -- CPU DRAM Hot Cache

v1/storage_backend/local_cpu_backend.py
class LocalCPUBackend(AllocatorBackendInterface):
    """Even if local_cpu is False, contains(), insert_key(), remove(),
    get_blocking(), get_keys(), and clear() are still callable."""

    def __init__(self, config, metadata, dst_device="cuda", ...):
        self.cache_policy = get_cache_policy(config.cache_policy)  # LRU default
        self.hot_cache = self.cache_policy.init_mutable_mapping()
        self.use_hot = config.local_cpu  # True by default
        self.memory_allocator = self.initialize_allocator(config, metadata)
        self.cpu_lock = threading.Lock()

    def contains(self, key, pin=False):
        with self.cpu_lock:
            if key not in self.hot_cache: return False
            if pin: self.hot_cache[key].pin()  # Pin to prevent eviction during retrieve
            return True

    def submit_put_task(self, key, memory_obj, on_complete_callback=None):
        """Synchronous put into the hot cache."""
        with self.cpu_lock:
            if key in self.hot_cache: return None
            memory_obj.ref_count_up()
            self.hot_cache[key] = memory_obj
            self.cache_policy.update_on_put(key)

4.2 L2: LocalDiskBackend -- NVMe/SSD

v1/storage_backend/local_disk_backend.py
class LocalDiskBackend(StorageBackendInterface):
    def __init__(self, config, loop, local_cpu_backend, ...):
        self.path = config.local_disk           # e.g., "/tmp/lmcache_disk/"
        self.max_cache_size = int(config.max_local_disk_size * 1024**3)
        self.disk_worker = LocalDiskWorker(loop)  # AsyncPQThreadPoolExecutor
        self.use_odirect = config.extra_config.get("use_odirect", False)

    def _key_to_path(self, key):
        return os.path.join(self.path, key.to_string().replace("/", "-") + ".pt")

The disk backend uses a priority queue executor: prefetch tasks (priority 0) run before delete (priority 1) and put (priority 2). This ensures that reads are not blocked by writes.

4.3 L3: RemoteBackend -- Redis/Valkey/S3/InfiniStore

v1/storage_backend/remote_backend.py
class RemoteBackend(StorageBackendInterface):
    def __init__(self, config, metadata, loop, local_cpu_backend, ...):
        self.remote_url = config.remote_url  # e.g., "redis://host:6379"
        self.connection = CreateConnector(config.remote_url, loop, ...)
        self.serializer, self.deserializer = CreateSerde(config.remote_serde, metadata, config)
        self.min_reconnect_interval = 10  # seconds
        self._mla_worker_id_as0_mode = ...  # MLA key normalization

    def contains(self, key, pin=False):
        if self.connection is None: return False
        if self._mla_worker_id_as0_mode:
            key = key.with_new_worker_id(0)  # normalize for MLA sharing
        return self.connection.exists_sync(key)

Supported remote connectors (each a plugin):

ConnectorURL PrefixNotes
Redisredis://Most common; async I/O via aioredis
Valkeyvalkey://Redis-compatible fork
S3s3://Object storage; high latency, high capacity
InfiniStoreinfinistore://RDMA-based key-value store
Mooncakemooncake://Distributed cache system
Filesystemfile://Shared filesystem (NFS/Lustre)
LMConnectorlm://LMCache-native remote protocol

4.4 L4: P2PBackend -- GPU-to-GPU Transfer

v1/storage_backend/p2p_backend.py
# P2P uses ZMQ for control plane and TransferChannel for data plane
class BatchedLookupAndGetMsg(P2PMsgBase):
    lookup_id: str
    receiver_id: str
    keys: list[str]           # CacheEngineKey in string form
    mem_indexes: list[int]    # Remote memory object indexes (to be written)

class PeerInfo:
    peer_init_url: str       # ZMQ endpoint for peer initialization
    peer_lookup_url: str     # ZMQ endpoint for lookup requests
    lookup_lock: asyncio.Lock
    lookup_socket: zmq.asyncio.Socket

P2P enables direct GPU memory transfer between inference instances via RDMA or NVLink, coordinated by a central controller. The PDBackend extends this for Prefill/Decode disaggregation where the prefill node pushes KV caches directly to the decode node.

5 StorageManager: Async Prefetching and Tier Orchestration

StorageManager Core v1/storage_backend/storage_manager.py
class StorageManager:
    """Responsible for managing all storage backends."""

    def __init__(self, config, metadata, event_manager, ...):
        self.loop = asyncio.new_event_loop()
        self.thread = threading.Thread(
            target=start_loop_in_thread_with_exceptions,
            args=(self.loop,), name="storage-manger-event-loop"
        )
        self.thread.start()

        self.storage_backends: OrderedDict[str, StorageBackendInterface] = OrderedDict()
        self.create_backends()

        self.allocator_backend = self._get_allocator_backend(config)
        self._freeze = False          # Freeze mode: protect hot cache
        self._bypassed_backends = set()  # Skip unhealthy backends

5.1 WeightedSemaphore -- Concurrency Control

class WeightedSemaphore:
    def __init__(self, chunk_budget):
        # Physically impossible to have >50% fragmentation with uniform chunks
        self._concurrent_budget_cap = chunk_budget // 2
        self._chunk_budget_cap = chunk_budget
        self._current_chunks = self._concurrent_budget_cap
        self._cond = asyncio.Condition()

    async def acquire(self, n=1):
        async with self._cond:
            if n <= self._concurrent_budget_cap:
                await self._cond.wait_for(lambda: self._current_chunks >= n)
                self._current_chunks -= n
            else:
                # Oversized: require exclusive access to all budgets
                await self._cond.wait_for(
                    lambda: self._current_chunks == self._concurrent_budget_cap
                )
                self._current_chunks = 0

5.2 Tier Promotion on get()

When a get() fetches data from L2/L3/L4, the StorageManager automatically promotes it to L1 (CPU DRAM) for faster subsequent access:

def get(self, key, location=None):
    """Blocking get with automatic tier promotion."""
    for backend_name, backend in self.get_active_storage_backends(location):
        memory_obj = backend.get_blocking(key)
        if memory_obj:
            # Auto-promote to L1 if retrieved from L2/L3/L4
            if backend_name not in ["LocalCPUBackend", "PDBackend"] \
               and "LocalCPUBackend" in self.storage_backends:
                local_cpu = self.storage_backends["LocalCPUBackend"]
                local_cpu.submit_put_task(key, memory_obj)
            return memory_obj
    return None

5.3 batched_put() -- Fan-Out to All Tiers

def batched_put(self, keys, memory_objs, transfer_spec=None, location=None):
    """Non-blocking fan-out to all active storage backends."""
    obj_dict = {}
    obj_dict[get_backend_cname(self.allocator_backend)] = (keys, memory_objs)

    for backend_name, backend in self.storage_backends.items():
        if location and backend_name != location: continue
        if backend_name in self._bypassed_backends: continue

        # Allocate and copy for backends needing their own memory pool
        allocator_backend = backend.get_allocator_backend()
        cname = get_backend_cname(allocator_backend)
        if cname not in obj_dict:
            new_keys, new_objs = allocate_and_copy_objects(
                allocator_backend, keys, memory_objs, self.internal_copy_stream
            )
            obj_dict[cname] = (new_keys, new_objs)

        ks, objs = obj_dict[cname]
        backend.batched_submit_put_task(ks, objs, transfer_spec=transfer_spec)

5.4 Async Prefetch with Event Callbacks

The prefetch_all_done_callback handles multi-tier prefetch completion. It enforces prefix continuity: if tier N retrieves fewer chunks than expected, all subsequent tier results are discarded.

def prefetch_all_done_callback(self, task, lookup_id, cum_chunk_lengths, tier_expected):
    res = task.result()  # [[tier0_objs], [tier1_objs], [tier2_objs]]

    total_retrieved = 0
    for tier_idx, tier_result in enumerate(res):
        actual = len(tier_result)
        expected = tier_expected[tier_idx]
        total_retrieved += actual
        if actual < expected:
            # Gap detected: release all subsequent tier chunks
            for subsequent_tier in res[tier_idx + 1:]:
                for mem_obj in subsequent_tier:
                    mem_obj.ref_count_down()
            break

    retrieved_length = cum_chunk_lengths[total_retrieved]

6 vLLM Integration: KVConnectorBase_V1

vLLM Integration Architecture: Scheduler and Worker Paths
vLLM Scheduler build_connector_meta() update_state_after_alloc() lookup() per new request SchedulerOutput LMCacheConnectorMetadata List[ReqMeta] -- per request: token_ids, slot_mapping, LoadSpec, SaveSpec, DisaggSpec Serialized to workers vLLM Worker start_load_kv() wait_for_layer_load() save_kv_layer() wait_for_save() Per-layer pipeline LMCacheEngine lookup() -- prefix match retrieve() / retrieve_layer() store() / store_layer() async_lookup_and_prefetch() Token chunking + Storage tiers lookup() retrieve/store
LMCacheConnectorV1Impl integration/vllm/vllm_v1_adapter.py

The vLLM adapter implements KVConnectorBase_V1. The scheduler calls build_connector_meta() (which calls lookup() to determine cache hits), and the worker calls start_load_kv() / save_kv_layer().

class LMCacheConnectorV1Impl:
    def __init__(self, vllm_config, role, parent):
        config = lmcache_get_or_create_config()
        service_factory = VllmServiceFactory(config, vllm_config, role.name.lower())
        self._manager = LMCacheManager(config, service_factory, connector=self)
        self._manager.start_services()

6.1 start_load_kv() -- Load from Cache to GPU

def start_load_kv(self, forward_context, **kwargs):
    """Start loading KV cache from connector buffer to vLLM's paged KV buffer."""
    metadata = self._parent._get_connector_metadata()
    kvcaches = list(self.kv_caches.values())

    for request in metadata.requests:
        if request.load_spec is None or not request.load_spec.can_load:
            continue

        tokens = request.token_ids
        slot_mapping = request.slot_mapping.to(self.device)

        # Build mask: skip tokens already cached in vLLM
        token_mask = torch.ones(len(tokens), dtype=torch.bool)
        masked_count = request.load_spec.vllm_cached_tokens // self._lmcache_chunk_size * self._lmcache_chunk_size
        token_mask[:masked_count] = False

        if self.use_layerwise:
            # Layer-by-layer pipeline: retrieve 2 layers ahead
            retriever = self.lmcache_engine.retrieve_layer(tokens, token_mask, ...)
            next(retriever); next(retriever)  # Pre-fetch first 2 layers
            self.layerwise_retrievers.append(retriever)
        else:
            ret_mask = self.lmcache_engine.retrieve(tokens, token_mask, kvcaches=kvcaches, ...)

6.2 save_kv_layer() -- Layerwise Save Pipeline

def save_kv_layer(self, layer_name, kv_layer, attn_metadata, **kwargs):
    """Save one layer of KV cache from vLLM's paged buffer."""
    for request in connector_metadata.requests:
        layerwise_storer = self._layerwise_save_storers.get(request.req_id)
        if layerwise_storer is None:
            # Initialize the generator on first layer
            skip_leading = save_spec.skip_leading_tokens // self._lmcache_chunk_size * self._lmcache_chunk_size
            store_mask = torch.ones(len(token_ids), dtype=torch.bool)
            store_mask[:skip_leading] = False

            layerwise_storer = self.lmcache_engine.store_layer(
                token_ids, mask=store_mask, kvcaches=kvcaches, slot_mapping=slot_mapping, ...
            )
            self._layerwise_save_storers[request.req_id] = layerwise_storer

        next(layerwise_storer)  # Process one layer per call

6.3 LoadSpec / SaveSpec -- Scheduler Decisions

Spec TypeFieldMeaning
LoadSpecvllm_cached_tokensTokens already in vLLM's page cache
lmcache_cached_tokensTokens found in LMCache (from lookup)
can_loadWhether scheduler permits loading
SaveSpecskip_leading_tokensAlready-stored prefix to skip
can_saveWhether scheduler permits saving

7 SGLang Integration: Direct Single-Process Calls

LMCacheConnector for SGLang integration/sglang/sglang_adapter.py

Unlike vLLM's multi-process scheduler/worker split, SGLang integration is single-process and direct. The connector wraps the LMCacheEngine and exposes simple load_kv() / store_kv() methods.

class LMCacheConnector:
    def __init__(self, sgl_config, tp_size, rank, k_pool, v_pool):
        kv_dtype = k_pool[0].dtype
        local_rank = k_pool[0].device.index  # GPU device from buffer
        self.lmcache_engine = init_lmcache_engine(sgl_config, tp_size, local_rank, rank, kv_dtype)
        self.kvcaches = k_pool + v_pool  # Concatenate K and V pools
        self.lmcache_engine.post_init(kvcaches=self.kvcaches)

    def load_kv(self, load_metadata):
        """Direct retrieve -- no scheduler coordination needed."""
        token_ids = torch.tensor(load_metadata.token_ids, dtype=torch.int64).cuda()
        slot_mapping = load_metadata.slot_mapping.cuda()
        load_mask = torch.ones_like(token_ids, dtype=torch.bool)
        load_mask[:load_metadata.offset] = False

        ret_mask = self.lmcache_engine.retrieve(
            token_ids, mask=load_mask,
            kvcaches=self.kvcaches, slot_mapping=slot_mapping,
            offset=load_metadata.offset,
        )
        return ret_mask.sum().item()

    def store_kv(self, store_metadata):
        """Direct store -- synchronous single-process call."""
        token_ids = torch.tensor(store_metadata.token_ids, dtype=torch.int64).cuda()
        slot_mapping = store_metadata.kv_indices.to(torch.int64).cuda()
        store_mask = torch.ones_like(token_ids, dtype=torch.bool)

        self.lmcache_engine.store(
            token_ids, mask=store_mask,
            kvcaches=self.kvcaches, slot_mapping=slot_mapping,
            offset=store_metadata.offset,
        )
vLLM Integration
  • Multi-process: separate scheduler + worker(s)
  • Scheduler calls lookup()
  • Worker calls start_load_kv() / save_kv_layer()
  • KVConnectorMetadata serialized between processes
  • Layerwise pipeline with generators
  • LMCacheManager handles service lifecycle
SGLang Integration
  • Single-process: direct engine calls
  • No separate scheduler coordination
  • load_kv() / store_kv() wrappers
  • k_pool + v_pool passed directly at init
  • Uses mock_up_broadcast_fn (no TP broadcast)
  • Simpler metadata: StoreMetadata / LoadMetadata
# SGLang initialization: constructing metadata from ModelConfig
def init_lmcache_engine(model_config, tp_size, local_rank, global_rank, kv_dtype):
    config = lmcache_get_config()
    kv_shape = (
        model_config.num_hidden_layers,  # num_layer
        2,                               # K and V
        config.chunk_size,               # 256 tokens
        model_config.get_num_kv_heads(tp_size),
        model_config.head_dim,
    )
    metadata = LMCacheMetadata(
        model_name=model_config.model_path,
        world_size=tp_size, worker_id=global_rank,
        kv_dtype=kv_dtype, kv_shape=kv_shape,
    )
    gpu_connector = CreateGPUConnector(config, metadata, EngineType.SGLANG)
    return LMCacheEngineBuilder.get_or_create(ENGINE_NAME, config, metadata, gpu_connector, ...)

8 CacheBlend: Non-Contiguous KV Chunk Reuse

CacheBlend Configuration

CacheBlend enables reusing KV cache chunks that may not form a contiguous prefix. When enable_blending=True, the engine uses SegmentTokenDatabase to split tokens at special separator strings, and the LMCBlender performs selective recomputation to maintain attention quality.

# Config definitions for CacheBlend (from v1/config.py)
"enable_blending":       { "type": bool,  "default": False },
"blend_recompute_ratios": { "type": list[float], "default": None },
"blend_thresholds":       { "type": list[float], "default": None },
"blend_check_layers":     { "type": list[int],   "default": None },
"blend_min_tokens":       { "type": int,   "default": 256 },
"blend_special_str":      { "type": str,   "default": " # # " },

8.1 SegmentTokenDatabase

class SegmentTokenDatabase(TokenDatabase):
    """Splits tokens at special separator patterns instead of fixed-size chunks."""

    def __init__(self, config, metadata):
        self.tokenizer = AutoTokenizer.from_pretrained(metadata.model_name)
        self.sep_tokens = self.tokenizer.encode(config.blend_special_str)[1:]
        self.sep_tokens = torch.tensor(self.sep_tokens, device="cpu")

    def _fast_split_by_subtensor(self, tokens):
        """Match sep_tokens with sliding windows."""
        windows = tokens.unfold(0, self.sep_len, 1)
        # Find positions where the window matches the separator
        ...

8.2 How CacheBlend Works

1
Segment Detection: Input tokens are split at separator boundaries (e.g., " # # ") into variable-length segments rather than fixed 256-token chunks.
2
Non-Contiguous Lookup: Each segment is independently hashed and looked up in the cache. Segments can be cache hits even if surrounding segments are misses.
3
Selective Recomputation: For segments with cache hits, the LMCBlender selectively recomputes a fraction (blend_recompute_ratios) of tokens at segment boundaries to maintain attention quality between cached and fresh segments.
4
Quality Check: blend_check_layers specifies which transformer layers to verify attention divergence. If the divergence at blend_thresholds is too high, more tokens are recomputed.
Auto-Config When Blending Is Enabled

When enable_blending=True, the config validator automatically sets save_unfull_chunk=True (required for variable-length segments) and the memory format switches to KV_2TD for layerwise mode.

9 Eviction Policies: LRU, LFU, FIFO, MRU + Pin/Unpin

Policy Factory v1/storage_backend/cache_policy/__init__.py
POLICY_MAPPING = {
    "LRU":  LRUCachePolicy,
    "LFU":  LFUCachePolicy,
    "FIFO": FIFOCachePolicy,
    "MRU":  MRUCachePolicy,
}

def get_cache_policy(policy_name: str) -> BaseCachePolicy:
    return POLICY_MAPPING[policy_name.upper()]()
DEFAULT

LRU (Least Recently Used)

Uses OrderedDict. On hit, move_to_end(key). Evicts from the front (oldest access).

def update_on_hit(self, key, cache_dict):
    cache_dict.move_to_end(key)

def get_evict_candidates(self, cache_dict, n=1):
    for key, cache in cache_dict.items():
        if cache.can_evict:
            evict_keys.append(key)
        if len(evict_keys) == n: break

FIFO (First In First Out)

Uses plain dict (insertion-ordered in Python 3.7+). update_on_hit() is a no-op -- access order does not matter.

def update_on_hit(self, key, cache_dict):
    pass  # No reordering

def get_evict_candidates(self, cache_dict, n=1):
    # Iterate from oldest insertion
    for key, cache in cache_dict.items():
        if cache.can_evict:
            evict_keys.append(key)

LFU (Least Frequently Used)

Uses SortedDict of frequency buckets. Each bucket is a FIFO dict. Evicts from lowest-frequency bucket first.

def update_on_hit(self, key, cache_dict):
    curr_freq = self.key_to_freq[key]
    self.freq_to_keys[curr_freq].pop(key)
    curr_freq += 1
    self.key_to_freq[key] = curr_freq
    self.freq_to_keys[curr_freq][key] = None

def update_on_put(self, key):
    self.key_to_freq[key] = 1
    self.freq_to_keys[1][key] = None

MRU (Most Recently Used)

Opposite of LRU: evicts the most recently accessed item. Uses OrderedDict but evicts from the end.

def get_evict_candidates(self, cache_dict, n=1):
    # Iterate in reverse (most recent first)
    for key in reversed(cache_dict):
        if cache_dict[key].can_evict:
            evict_keys.append(key)

9.1 Pin/Unpin Mechanism

PinMonitor -- Timeout-Based Pin Protection v1/pin_monitor.py

Memory objects are pinned during lookup to prevent eviction between the lookup and retrieve steps. The PinMonitor (singleton) runs a background thread that forcibly unpins objects that exceed the timeout.

class PinMonitor(PeriodicThread):
    """Global singleton: monitors pinned MemoryObj instances for timeout."""

    def __init__(self, config):
        self._pinned_objects = {}  # {obj_id: (memory_obj, register_time)}
        self._pin_timeout_sec = config.pin_timeout_sec     # default: 300s
        self._check_interval = config.pin_check_interval_sec # default: 30s

    def on_pin(self, memory_obj):
        """Register a pinned object for timeout monitoring."""
        obj_id = id(memory_obj)
        self._pinned_objects[obj_id] = (memory_obj, time.time())

    def on_unpin(self, memory_obj):
        """Unregister from timeout monitoring."""
        self._pinned_objects.pop(id(memory_obj), None)
Config ParameterDefaultDescription
cache_policyLRUEviction policy: LRU, LFU, FIFO, MRU
pin_timeout_sec300Max seconds a MemoryObj can remain pinned
pin_check_interval_sec30How often PinMonitor scans for timeouts
max_local_cpu_size5.0GB of CPU DRAM allocated for L1 cache
max_local_disk_size0.0GB of disk space for L2 cache (0 = disabled)
blocking_timeout_secs10Max wait time for blocking get operations

9.2 Eviction Safety: can_evict Checks

All eviction policies check cache.can_evict before evicting. A MemoryObj is not evictable when it is pinned (pin_count > 0). This prevents evicting cache entries that are in the middle of being retrieved.

# From DiskCacheMetadata (similar pattern in MemoryObj)
@property
def can_evict(self) -> bool:
    return not self.is_pinned  # pin_count == 0

10 End-to-End Cache Lookup and Store Flow

Complete Cache Lookup / Store Data Flow
New Request Arrives ChunkedTokenDatabase.process_tokens() _prefix_hash() -> CacheEngineKey per chunk LOOKUP PATH batched_contains() across backends Prefix match: count contiguous hits Return hit_token_count to scheduler retrieve() -> GPU via GPUConnector STORE PATH allocate() MemoryObj from L1 pool batched_from_gpu() via GPUConnector batched_put() -> fan-out to all tiers L1 sync | L2/L3/L4 async

11 LMCacheEngineConfig -- Complete Configuration Reference

Configuration System v1/config.py

Configuration is loaded from YAML files, environment variables (prefix LMCACHE_), or programmatic overrides. The system supports aliases for deprecated config names.

CategoryParameterDefaultDescription
Basicchunk_size256Tokens per chunk for KV cache segmentation
local_cpuTrueEnable L1 CPU DRAM hot cache
max_local_cpu_size5.0GB allocated for L1 CPU cache
local_diskNonePath for L2 disk cache (None = disabled)
remote_urlNoneURL for L3 remote storage (redis://...)
Featuresuse_layerwiseFalseEnable per-layer KV cache storage
save_decode_cacheFalseStore KV cache during decode phase
save_unfull_chunkFalseStore partial chunks (required for blending/PD)
Blendingenable_blendingFalseEnable CacheBlend non-contiguous reuse
blend_recompute_ratiosNoneFraction of tokens to recompute per layer
blend_special_str" # # "Separator string for segment splitting
P2Penable_p2pFalseEnable peer-to-peer GPU transfer
enable_pdFalseEnable Prefill/Decode disaggregation
pd_roleNone"producer" (prefill) or "receiver" (decode)
Memoryenable_lazy_memory_allocatorFalseAllocate memory on-demand (reduce footprint)
lazy_memory_initial_ratio0.2Initial allocation as fraction of target
Policycache_policyLRUEviction policy: LRU, LFU, FIFO, MRU
min_retrieve_tokens0Min hit tokens required to actually retrieve
# Environment variable configuration example
export LMCACHE_CHUNK_SIZE=256
export LMCACHE_LOCAL_CPU=true
export LMCACHE_MAX_LOCAL_CPU_SIZE=8.0
export LMCACHE_LOCAL_DISK=/tmp/lmcache_disk/
export LMCACHE_MAX_LOCAL_DISK_SIZE=20.0
export LMCACHE_REMOTE_URL=redis://localhost:6379
export LMCACHE_CACHE_POLICY=LRU
export LMCACHE_ENABLE_BLENDING=false
export LMCACHE_USE_LAYERWISE=true

LMCache Source Code Architecture Deep Dive | All code from lmcache/v1/

Back to AI Infrastructure Overview