Complete source code architecture analysis of LMCache v1 -- the multi-tier KV cache engine for LLM inference. Every code snippet is extracted directly from the repository.
LMCacheEngine is the main entry point for all KV cache operations.
It owns the StorageManager, a TokenDatabase,
and a GPUConnectorInterface. The three core APIs -- lookup,
retrieve, and store -- form the entire external interface.
class LMCacheEngine: """The main class for the cache engine. When storing the KV caches into the cache engine, it takes GPU KV caches from the serving engine and convert them into MemoryObjs that resides in the CPU. The MemoryObjs are then being stored into the StorageBackends in an asynchronous manner. """ def __init__( self, config: LMCacheEngineConfig, metadata: LMCacheMetadata, token_database: TokenDatabase, gpu_connector: Optional[GPUConnectorInterface], broadcast_fn: Callable[[torch.Tensor, int], None], broadcast_object_fn: Callable[[Any, int], Any], ): self.config = config self.token_database = token_database self.gpu_connector = gpu_connector self.storage_manager: Optional[StorageManager] = None self.use_layerwise = config.use_layerwise self.num_layers = metadata.kv_shape[0] self.remove_after_retrieve = config.enable_pd and config.pd_role == "receiver"
The lookup() method checks how many prefix tokens already exist in the cache.
It chunks tokens, generates content-addressable keys, and calls batched_contains()
on the storage backends. Returns the count of cached prefix tokens.
@_lmcache_nvtx_annotate def lookup( self, tokens: Optional[Union[torch.Tensor, list[int]]] = None, hashes: Optional[List[int]] = None, offsets: Optional[List[int]] = None, search_range: Optional[List[str]] = None, lookup_id: Optional[str] = None, pin: bool = False, request_configs: Optional[dict] = None, ) -> int: """Checks the existence of KV cache prefix tokens. :return: An int indicating how many prefix tokens exist inside LMCache.""" # Health check: block operation if LMCache is unhealthy if not self.is_healthy(): return 0 chunk_info_iterator = self.token_database.process_tokens( tokens=tokens, hashes=hashes, offsets=offsets, request_configs=request_configs, ) # Collect all keys then batch-check against storage backends keys = [] for chunk_info in chunk_info_iterator: keys.append(chunk_info[2]) # (start, end, key) # Prefix-match: hit_chunks is the count of contiguous prefix hits hit_chunks, block_mapping = self.storage_manager.batched_contains( keys, search_range, pin ) res = 0 for idx, (start, end, key) in enumerate(chunk_info_list): if idx < hit_chunks: res = end continue return res
The store() method chunks tokens, allocates MemoryObjs from the allocator backend,
copies KV data from GPU via the GPUConnector, then submits to storage backends asynchronously.
@_lmcache_nvtx_annotate @torch.inference_mode() def store(self, tokens, hashes, offsets, mask, **kwargs) -> None: # Step 1: Chunk tokens and generate content-addressable keys for start, end, key in self.token_database.process_tokens(tokens, hashes, offsets, mask): # Step 2: Allocate CPU memory from the allocator backend memory_obj = self.storage_manager.allocate(kv_shapes, kv_dtypes, fmt=self.fmt) if memory_obj is None: logger.warning("Local cpu memory under pressure, partial store") break keys.append(key) memory_objs.append(memory_obj) # Step 3: Batch copy from GPU to CPU memory objects self.gpu_connector.batched_from_gpu(memory_objs, starts, ends, **kwargs) # Step 4: Submit to all storage backends (async for remote/disk) self.storage_manager.batched_put(keys, memory_objs, location=self.store_location)
The retrieve() method fetches MemoryObjs from storage backends and pushes them to GPU
via the GPUConnector. Returns a boolean mask of which tokens were successfully retrieved.
@_lmcache_nvtx_annotate @torch.inference_mode() def retrieve(self, tokens, mask, **kwargs) -> torch.Tensor: """Retrieve KV caches and put them into the serving engine. :return: boolean mask -- True where tokens are retrieved.""" ret_mask = torch.zeros(len(tokens), dtype=torch.bool, device="cpu") reordered_chunks = self._process_tokens_internal(tokens, mask, ret_mask, **kwargs) # Batch copy from CPU memory objects to GPU KV cache _, memory_objs, starts, ends = zip(*reordered_chunks) self.gpu_connector.batched_to_gpu(list(memory_objs), list(starts), list(ends), **kwargs) # Optionally remove after retrieve (PD disaggregation receiver) for key, memory_obj, _, _ in reordered_chunks: if self.remove_after_retrieve: self.storage_manager.remove(key, self.retrieve_locations) memory_obj.ref_count_down() return ret_mask
class ChunkedTokenDatabase(TokenDatabase): def __init__(self, config, metadata): self.chunk_size = config.chunk_size # default: 256 def _chunk_tokens(self, tokens): """Chunk tokens into blocks of size chunk_size.""" save_unfull_chunk = self.config.save_unfull_chunk if self.config else True end = len(tokens) if save_unfull_chunk else (len(tokens) - len(tokens) % self.chunk_size) for i in range(0, end, self.chunk_size): yield tokens[i : i + self.chunk_size] def _prefix_hash(self, token_chunks): """Rolling prefix hash: each chunk hash depends on the previous.""" prefix_hash = self._get_init_hash() # NONE_HASH (typically 0) for token_chunk in token_chunks: prefix_hash = self._hash_tokens(token_chunk, prefix_hash) yield prefix_hash
Each chunk's hash incorporates the previous chunk's hash as a prefix. This means the hash for chunk N encodes the entire prefix [0..N*256], enabling content-addressable deduplication -- two requests sharing the same system prompt will have identical chunk hashes for overlapping prefixes.
# The canonical hash computation from TokenDatabase base class def _hash_tokens(self, tokens, prefix_hash=None, extra_keys=None): tokens_tuple = tuple(tokens.cpu().tolist()) if isinstance(tokens, torch.Tensor) else tuple(tokens) # Canonicalize inputs for consistent cross-process hashing canon_prefix = prefix_hash if prefix_hash is not None else NONE_HASH canon_tokens = tokens_tuple canon_extra = tuple(extra_keys) if extra_keys else () return self.hash_func((canon_prefix, canon_tokens, canon_extra))
def process_tokens(self, tokens, hashes, offsets, mask, make_key=True, request_configs=None): # Mask validation: false count must be multiple of chunk_size num_falses = mask.numel() - mask.long().sum().item() if mask else 0 if num_falses % self.chunk_size != 0: raise ValueError("Falses in mask must be multiple of chunk_size") if tokens is not None: token_chunks = self._chunk_tokens(tokens) prefix_hashes = self._prefix_hash(token_chunks) for chunk_id, hash_val in enumerate(prefix_hashes): start_idx = chunk_id * self.chunk_size end_idx = min(start_idx + self.chunk_size, len(tokens)) if start_idx < num_falses: continue # Skip masked prefix chunks yield (start_idx, end_idx, self._make_key_by_hash(hash_val, request_configs))
@dataclass class CacheEngineKey: model_name: str # e.g., "meta-llama/Llama-3-8B" world_size: int # tensor parallel size worker_id: int # which TP rank chunk_hash: int # rolling prefix hash of this chunk dtype: torch.dtype # e.g., torch.float16 request_configs: Optional[dict] # per-request tags (LoRA ID, etc.) tags: Optional[tuple] # derived from request_configs["lmcache.tag.*"] def __hash__(self): return hash(( self.model_name, self.world_size, self.worker_id, self.chunk_hash, self._dtype_str, self.tags, )) def to_string(self): # Format: "model@worldsize@workerid@chunkhash_hex@dtype[@tag1%val1@...]" s = f"{self.model_name}@{self.world_size}@{self.worker_id}@{self.chunk_hash_hex}@{self._dtype_str}" if self.tags: tags = [f"{k}%{v}" for k, v in self.tags] s += "@" + "@".join(tags) return s
| Field | Purpose | Example Value |
|---|---|---|
model_name | Prevents cross-model cache collisions | meta-llama/Llama-3-8B |
world_size | TP-aware: different shard counts produce different KVs | 4 |
worker_id | Which TP rank's KV shard this belongs to | 0 |
chunk_hash | Rolling prefix hash -- content-addressable | 0x3a7f... (64-bit) |
dtype | Prevents mixing float16/bfloat16 caches | torch.bfloat16 |
tags | Per-request metadata (LoRA, custom tags) | (("lora", "adapter1"),) |
When use_layerwise=True, each chunk key is split into per-layer keys via
key.split_layers(num_layers), producing LayerCacheEngineKey
objects with an additional layer_id field. This allows independent per-layer storage and retrieval.
class LocalCPUBackend(AllocatorBackendInterface): """Even if local_cpu is False, contains(), insert_key(), remove(), get_blocking(), get_keys(), and clear() are still callable.""" def __init__(self, config, metadata, dst_device="cuda", ...): self.cache_policy = get_cache_policy(config.cache_policy) # LRU default self.hot_cache = self.cache_policy.init_mutable_mapping() self.use_hot = config.local_cpu # True by default self.memory_allocator = self.initialize_allocator(config, metadata) self.cpu_lock = threading.Lock() def contains(self, key, pin=False): with self.cpu_lock: if key not in self.hot_cache: return False if pin: self.hot_cache[key].pin() # Pin to prevent eviction during retrieve return True def submit_put_task(self, key, memory_obj, on_complete_callback=None): """Synchronous put into the hot cache.""" with self.cpu_lock: if key in self.hot_cache: return None memory_obj.ref_count_up() self.hot_cache[key] = memory_obj self.cache_policy.update_on_put(key)
class LocalDiskBackend(StorageBackendInterface): def __init__(self, config, loop, local_cpu_backend, ...): self.path = config.local_disk # e.g., "/tmp/lmcache_disk/" self.max_cache_size = int(config.max_local_disk_size * 1024**3) self.disk_worker = LocalDiskWorker(loop) # AsyncPQThreadPoolExecutor self.use_odirect = config.extra_config.get("use_odirect", False) def _key_to_path(self, key): return os.path.join(self.path, key.to_string().replace("/", "-") + ".pt")
The disk backend uses a priority queue executor: prefetch tasks (priority 0) run before delete (priority 1) and put (priority 2). This ensures that reads are not blocked by writes.
class RemoteBackend(StorageBackendInterface): def __init__(self, config, metadata, loop, local_cpu_backend, ...): self.remote_url = config.remote_url # e.g., "redis://host:6379" self.connection = CreateConnector(config.remote_url, loop, ...) self.serializer, self.deserializer = CreateSerde(config.remote_serde, metadata, config) self.min_reconnect_interval = 10 # seconds self._mla_worker_id_as0_mode = ... # MLA key normalization def contains(self, key, pin=False): if self.connection is None: return False if self._mla_worker_id_as0_mode: key = key.with_new_worker_id(0) # normalize for MLA sharing return self.connection.exists_sync(key)
Supported remote connectors (each a plugin):
| Connector | URL Prefix | Notes |
|---|---|---|
| Redis | redis:// | Most common; async I/O via aioredis |
| Valkey | valkey:// | Redis-compatible fork |
| S3 | s3:// | Object storage; high latency, high capacity |
| InfiniStore | infinistore:// | RDMA-based key-value store |
| Mooncake | mooncake:// | Distributed cache system |
| Filesystem | file:// | Shared filesystem (NFS/Lustre) |
| LMConnector | lm:// | LMCache-native remote protocol |
# P2P uses ZMQ for control plane and TransferChannel for data plane class BatchedLookupAndGetMsg(P2PMsgBase): lookup_id: str receiver_id: str keys: list[str] # CacheEngineKey in string form mem_indexes: list[int] # Remote memory object indexes (to be written) class PeerInfo: peer_init_url: str # ZMQ endpoint for peer initialization peer_lookup_url: str # ZMQ endpoint for lookup requests lookup_lock: asyncio.Lock lookup_socket: zmq.asyncio.Socket
P2P enables direct GPU memory transfer between inference instances via RDMA or NVLink,
coordinated by a central controller. The PDBackend extends this for
Prefill/Decode disaggregation where the prefill node pushes KV caches directly to the decode node.
class StorageManager: """Responsible for managing all storage backends.""" def __init__(self, config, metadata, event_manager, ...): self.loop = asyncio.new_event_loop() self.thread = threading.Thread( target=start_loop_in_thread_with_exceptions, args=(self.loop,), name="storage-manger-event-loop" ) self.thread.start() self.storage_backends: OrderedDict[str, StorageBackendInterface] = OrderedDict() self.create_backends() self.allocator_backend = self._get_allocator_backend(config) self._freeze = False # Freeze mode: protect hot cache self._bypassed_backends = set() # Skip unhealthy backends
class WeightedSemaphore: def __init__(self, chunk_budget): # Physically impossible to have >50% fragmentation with uniform chunks self._concurrent_budget_cap = chunk_budget // 2 self._chunk_budget_cap = chunk_budget self._current_chunks = self._concurrent_budget_cap self._cond = asyncio.Condition() async def acquire(self, n=1): async with self._cond: if n <= self._concurrent_budget_cap: await self._cond.wait_for(lambda: self._current_chunks >= n) self._current_chunks -= n else: # Oversized: require exclusive access to all budgets await self._cond.wait_for( lambda: self._current_chunks == self._concurrent_budget_cap ) self._current_chunks = 0
When a get() fetches data from L2/L3/L4, the StorageManager automatically
promotes it to L1 (CPU DRAM) for faster subsequent access:
def get(self, key, location=None): """Blocking get with automatic tier promotion.""" for backend_name, backend in self.get_active_storage_backends(location): memory_obj = backend.get_blocking(key) if memory_obj: # Auto-promote to L1 if retrieved from L2/L3/L4 if backend_name not in ["LocalCPUBackend", "PDBackend"] \ and "LocalCPUBackend" in self.storage_backends: local_cpu = self.storage_backends["LocalCPUBackend"] local_cpu.submit_put_task(key, memory_obj) return memory_obj return None
def batched_put(self, keys, memory_objs, transfer_spec=None, location=None): """Non-blocking fan-out to all active storage backends.""" obj_dict = {} obj_dict[get_backend_cname(self.allocator_backend)] = (keys, memory_objs) for backend_name, backend in self.storage_backends.items(): if location and backend_name != location: continue if backend_name in self._bypassed_backends: continue # Allocate and copy for backends needing their own memory pool allocator_backend = backend.get_allocator_backend() cname = get_backend_cname(allocator_backend) if cname not in obj_dict: new_keys, new_objs = allocate_and_copy_objects( allocator_backend, keys, memory_objs, self.internal_copy_stream ) obj_dict[cname] = (new_keys, new_objs) ks, objs = obj_dict[cname] backend.batched_submit_put_task(ks, objs, transfer_spec=transfer_spec)
The prefetch_all_done_callback handles multi-tier prefetch completion. It enforces
prefix continuity: if tier N retrieves fewer chunks than expected, all subsequent tier results are discarded.
def prefetch_all_done_callback(self, task, lookup_id, cum_chunk_lengths, tier_expected): res = task.result() # [[tier0_objs], [tier1_objs], [tier2_objs]] total_retrieved = 0 for tier_idx, tier_result in enumerate(res): actual = len(tier_result) expected = tier_expected[tier_idx] total_retrieved += actual if actual < expected: # Gap detected: release all subsequent tier chunks for subsequent_tier in res[tier_idx + 1:]: for mem_obj in subsequent_tier: mem_obj.ref_count_down() break retrieved_length = cum_chunk_lengths[total_retrieved]
The vLLM adapter implements KVConnectorBase_V1. The scheduler calls
build_connector_meta() (which calls lookup() to determine cache hits),
and the worker calls start_load_kv() / save_kv_layer().
class LMCacheConnectorV1Impl: def __init__(self, vllm_config, role, parent): config = lmcache_get_or_create_config() service_factory = VllmServiceFactory(config, vllm_config, role.name.lower()) self._manager = LMCacheManager(config, service_factory, connector=self) self._manager.start_services()
def start_load_kv(self, forward_context, **kwargs): """Start loading KV cache from connector buffer to vLLM's paged KV buffer.""" metadata = self._parent._get_connector_metadata() kvcaches = list(self.kv_caches.values()) for request in metadata.requests: if request.load_spec is None or not request.load_spec.can_load: continue tokens = request.token_ids slot_mapping = request.slot_mapping.to(self.device) # Build mask: skip tokens already cached in vLLM token_mask = torch.ones(len(tokens), dtype=torch.bool) masked_count = request.load_spec.vllm_cached_tokens // self._lmcache_chunk_size * self._lmcache_chunk_size token_mask[:masked_count] = False if self.use_layerwise: # Layer-by-layer pipeline: retrieve 2 layers ahead retriever = self.lmcache_engine.retrieve_layer(tokens, token_mask, ...) next(retriever); next(retriever) # Pre-fetch first 2 layers self.layerwise_retrievers.append(retriever) else: ret_mask = self.lmcache_engine.retrieve(tokens, token_mask, kvcaches=kvcaches, ...)
def save_kv_layer(self, layer_name, kv_layer, attn_metadata, **kwargs): """Save one layer of KV cache from vLLM's paged buffer.""" for request in connector_metadata.requests: layerwise_storer = self._layerwise_save_storers.get(request.req_id) if layerwise_storer is None: # Initialize the generator on first layer skip_leading = save_spec.skip_leading_tokens // self._lmcache_chunk_size * self._lmcache_chunk_size store_mask = torch.ones(len(token_ids), dtype=torch.bool) store_mask[:skip_leading] = False layerwise_storer = self.lmcache_engine.store_layer( token_ids, mask=store_mask, kvcaches=kvcaches, slot_mapping=slot_mapping, ... ) self._layerwise_save_storers[request.req_id] = layerwise_storer next(layerwise_storer) # Process one layer per call
| Spec Type | Field | Meaning |
|---|---|---|
LoadSpec | vllm_cached_tokens | Tokens already in vLLM's page cache |
lmcache_cached_tokens | Tokens found in LMCache (from lookup) | |
can_load | Whether scheduler permits loading | |
SaveSpec | skip_leading_tokens | Already-stored prefix to skip |
can_save | Whether scheduler permits saving |
Unlike vLLM's multi-process scheduler/worker split, SGLang integration is single-process and direct.
The connector wraps the LMCacheEngine and exposes simple load_kv() / store_kv() methods.
class LMCacheConnector: def __init__(self, sgl_config, tp_size, rank, k_pool, v_pool): kv_dtype = k_pool[0].dtype local_rank = k_pool[0].device.index # GPU device from buffer self.lmcache_engine = init_lmcache_engine(sgl_config, tp_size, local_rank, rank, kv_dtype) self.kvcaches = k_pool + v_pool # Concatenate K and V pools self.lmcache_engine.post_init(kvcaches=self.kvcaches) def load_kv(self, load_metadata): """Direct retrieve -- no scheduler coordination needed.""" token_ids = torch.tensor(load_metadata.token_ids, dtype=torch.int64).cuda() slot_mapping = load_metadata.slot_mapping.cuda() load_mask = torch.ones_like(token_ids, dtype=torch.bool) load_mask[:load_metadata.offset] = False ret_mask = self.lmcache_engine.retrieve( token_ids, mask=load_mask, kvcaches=self.kvcaches, slot_mapping=slot_mapping, offset=load_metadata.offset, ) return ret_mask.sum().item() def store_kv(self, store_metadata): """Direct store -- synchronous single-process call.""" token_ids = torch.tensor(store_metadata.token_ids, dtype=torch.int64).cuda() slot_mapping = store_metadata.kv_indices.to(torch.int64).cuda() store_mask = torch.ones_like(token_ids, dtype=torch.bool) self.lmcache_engine.store( token_ids, mask=store_mask, kvcaches=self.kvcaches, slot_mapping=slot_mapping, offset=store_metadata.offset, )
lookup()start_load_kv() / save_kv_layer()load_kv() / store_kv() wrappersmock_up_broadcast_fn (no TP broadcast)# SGLang initialization: constructing metadata from ModelConfig def init_lmcache_engine(model_config, tp_size, local_rank, global_rank, kv_dtype): config = lmcache_get_config() kv_shape = ( model_config.num_hidden_layers, # num_layer 2, # K and V config.chunk_size, # 256 tokens model_config.get_num_kv_heads(tp_size), model_config.head_dim, ) metadata = LMCacheMetadata( model_name=model_config.model_path, world_size=tp_size, worker_id=global_rank, kv_dtype=kv_dtype, kv_shape=kv_shape, ) gpu_connector = CreateGPUConnector(config, metadata, EngineType.SGLANG) return LMCacheEngineBuilder.get_or_create(ENGINE_NAME, config, metadata, gpu_connector, ...)
CacheBlend enables reusing KV cache chunks that may not form a contiguous prefix.
When enable_blending=True, the engine uses SegmentTokenDatabase to split tokens
at special separator strings, and the LMCBlender performs selective recomputation
to maintain attention quality.
# Config definitions for CacheBlend (from v1/config.py) "enable_blending": { "type": bool, "default": False }, "blend_recompute_ratios": { "type": list[float], "default": None }, "blend_thresholds": { "type": list[float], "default": None }, "blend_check_layers": { "type": list[int], "default": None }, "blend_min_tokens": { "type": int, "default": 256 }, "blend_special_str": { "type": str, "default": " # # " },
class SegmentTokenDatabase(TokenDatabase): """Splits tokens at special separator patterns instead of fixed-size chunks.""" def __init__(self, config, metadata): self.tokenizer = AutoTokenizer.from_pretrained(metadata.model_name) self.sep_tokens = self.tokenizer.encode(config.blend_special_str)[1:] self.sep_tokens = torch.tensor(self.sep_tokens, device="cpu") def _fast_split_by_subtensor(self, tokens): """Match sep_tokens with sliding windows.""" windows = tokens.unfold(0, self.sep_len, 1) # Find positions where the window matches the separator ...
LMCBlender
selectively recomputes a fraction (blend_recompute_ratios) of tokens at segment
boundaries to maintain attention quality between cached and fresh segments.
blend_check_layers specifies which transformer layers to
verify attention divergence. If the divergence at blend_thresholds is too high,
more tokens are recomputed.
When enable_blending=True, the config validator automatically sets
save_unfull_chunk=True (required for variable-length segments) and
the memory format switches to KV_2TD for layerwise mode.
POLICY_MAPPING = {
"LRU": LRUCachePolicy,
"LFU": LFUCachePolicy,
"FIFO": FIFOCachePolicy,
"MRU": MRUCachePolicy,
}
def get_cache_policy(policy_name: str) -> BaseCachePolicy:
return POLICY_MAPPING[policy_name.upper()]()
Uses OrderedDict. On hit, move_to_end(key).
Evicts from the front (oldest access).
def update_on_hit(self, key, cache_dict): cache_dict.move_to_end(key) def get_evict_candidates(self, cache_dict, n=1): for key, cache in cache_dict.items(): if cache.can_evict: evict_keys.append(key) if len(evict_keys) == n: break
Uses plain dict (insertion-ordered in Python 3.7+).
update_on_hit() is a no-op -- access order does not matter.
def update_on_hit(self, key, cache_dict): pass # No reordering def get_evict_candidates(self, cache_dict, n=1): # Iterate from oldest insertion for key, cache in cache_dict.items(): if cache.can_evict: evict_keys.append(key)
Uses SortedDict of frequency buckets. Each bucket is a FIFO dict.
Evicts from lowest-frequency bucket first.
def update_on_hit(self, key, cache_dict): curr_freq = self.key_to_freq[key] self.freq_to_keys[curr_freq].pop(key) curr_freq += 1 self.key_to_freq[key] = curr_freq self.freq_to_keys[curr_freq][key] = None def update_on_put(self, key): self.key_to_freq[key] = 1 self.freq_to_keys[1][key] = None
Opposite of LRU: evicts the most recently accessed item. Uses OrderedDict
but evicts from the end.
def get_evict_candidates(self, cache_dict, n=1): # Iterate in reverse (most recent first) for key in reversed(cache_dict): if cache_dict[key].can_evict: evict_keys.append(key)
Memory objects are pinned during lookup to prevent eviction between the lookup and retrieve steps.
The PinMonitor (singleton) runs a background thread that forcibly unpins objects that exceed the timeout.
class PinMonitor(PeriodicThread): """Global singleton: monitors pinned MemoryObj instances for timeout.""" def __init__(self, config): self._pinned_objects = {} # {obj_id: (memory_obj, register_time)} self._pin_timeout_sec = config.pin_timeout_sec # default: 300s self._check_interval = config.pin_check_interval_sec # default: 30s def on_pin(self, memory_obj): """Register a pinned object for timeout monitoring.""" obj_id = id(memory_obj) self._pinned_objects[obj_id] = (memory_obj, time.time()) def on_unpin(self, memory_obj): """Unregister from timeout monitoring.""" self._pinned_objects.pop(id(memory_obj), None)
| Config Parameter | Default | Description |
|---|---|---|
cache_policy | LRU | Eviction policy: LRU, LFU, FIFO, MRU |
pin_timeout_sec | 300 | Max seconds a MemoryObj can remain pinned |
pin_check_interval_sec | 30 | How often PinMonitor scans for timeouts |
max_local_cpu_size | 5.0 | GB of CPU DRAM allocated for L1 cache |
max_local_disk_size | 0.0 | GB of disk space for L2 cache (0 = disabled) |
blocking_timeout_secs | 10 | Max wait time for blocking get operations |
All eviction policies check cache.can_evict before evicting. A MemoryObj is not evictable
when it is pinned (pin_count > 0). This prevents evicting cache entries that are in the middle of being retrieved.
# From DiskCacheMetadata (similar pattern in MemoryObj) @property def can_evict(self) -> bool: return not self.is_pinned # pin_count == 0
Configuration is loaded from YAML files, environment variables (prefix LMCACHE_),
or programmatic overrides. The system supports aliases for deprecated config names.
| Category | Parameter | Default | Description |
|---|---|---|---|
| Basic | chunk_size | 256 | Tokens per chunk for KV cache segmentation |
local_cpu | True | Enable L1 CPU DRAM hot cache | |
max_local_cpu_size | 5.0 | GB allocated for L1 CPU cache | |
local_disk | None | Path for L2 disk cache (None = disabled) | |
remote_url | None | URL for L3 remote storage (redis://...) | |
| Features | use_layerwise | False | Enable per-layer KV cache storage |
save_decode_cache | False | Store KV cache during decode phase | |
save_unfull_chunk | False | Store partial chunks (required for blending/PD) | |
| Blending | enable_blending | False | Enable CacheBlend non-contiguous reuse |
blend_recompute_ratios | None | Fraction of tokens to recompute per layer | |
blend_special_str | " # # " | Separator string for segment splitting | |
| P2P | enable_p2p | False | Enable peer-to-peer GPU transfer |
enable_pd | False | Enable Prefill/Decode disaggregation | |
pd_role | None | "producer" (prefill) or "receiver" (decode) | |
| Memory | enable_lazy_memory_allocator | False | Allocate memory on-demand (reduce footprint) |
lazy_memory_initial_ratio | 0.2 | Initial allocation as fraction of target | |
| Policy | cache_policy | LRU | Eviction policy: LRU, LFU, FIFO, MRU |
min_retrieve_tokens | 0 | Min hit tokens required to actually retrieve |
# Environment variable configuration example export LMCACHE_CHUNK_SIZE=256 export LMCACHE_LOCAL_CPU=true export LMCACHE_MAX_LOCAL_CPU_SIZE=8.0 export LMCACHE_LOCAL_DISK=/tmp/lmcache_disk/ export LMCACHE_MAX_LOCAL_DISK_SIZE=20.0 export LMCACHE_REMOTE_URL=redis://localhost:6379 export LMCACHE_CACHE_POLICY=LRU export LMCACHE_ENABLE_BLENDING=false export LMCACHE_USE_LAYERWISE=true
LMCache Source Code Architecture Deep Dive | All code from lmcache/v1/