Deep Dive — Source Code Analysis

Execution Time Prediction & Discrete Event System

How Vidur models GPU kernel latency without any GPUs, drives simulation through a priority-queue event loop, and generates realistic request traffic from synthetic distributions or production traces.

vidur/execution_time_predictor/ • vidur/events/ • vidur/simulator.py • vidur/request_generator/
← Back to Vidur Overview

Architectural Overview

Vidur's simulation engine rests on three pillars that work in concert: an execution time predictor that estimates how long each GPU operation takes for a given batch, a discrete event system that advances simulated time through a priority queue, and a request generator that produces realistic workload patterns. Together, these components allow Vidur to simulate an entire LLM serving cluster on a single CPU in seconds.

REQUEST GENERATOR Synthetic (Poisson/Gamma) Trace Replay (Azure CSV) Length: Uniform/Zipf/Trace List[Request] -> events EVENT PRIORITY QUEUE heapq sorted by (time, event_type, id) t=0.01 REQUEST_ARRIVAL #1 t=0.01 GLOBAL_SCHEDULE #2 t=0.05 BATCH_STAGE_END #5 t=0.12 REQUEST_ARRIVAL #8 EXECUTION PREDICTOR sklearn: GridSearchCV Linear Regression / Random Forest Profiled data -> trained models Batch -> ExecutionTime SIMULATOR.run() — Main Event Loop while event_queue and not terminate: _, event = heapq.heappop(event_queue) self._set_time(event._time) new_events = event.handle_event(scheduler, metrics) self._add_events(new_events) # push back to heap event_trace.append(event.to_dict()) # optional METRICS STORE CDFSketch (DDSketch) DataSeries SeriesAverageMeter WandB + Plotly

The Execution Time Predictor Hierarchy

At the heart of Vidur's ability to simulate without GPUs is the execution time predictor. The base class BaseExecutionTimePredictor defines a 19-component breakdown of every batch's execution time. This is not a single monolithic prediction -- every sub-operation of a transformer layer is modeled independently.

# vidur/execution_time_predictor/base_execution_time_predictor.py
class BaseExecutionTimePredictor(ABC):
    def __init__(self, predictor_config, replica_config,
                 replica_scheduler_config, metrics_config):
        self._config = predictor_config
        self._replica_config = replica_config
        self._model_config = replica_config.model_config

        self._replica_scheduler_provider = str(replica_scheduler_config.get_type())
        self._block_size = replica_scheduler_config.block_size
        self._num_layers_per_pipeline_stage = (
            self._model_config.num_layers // self._replica_config.num_pipeline_stages
        )

    def get_execution_time(self, batch: Batch, pipeline_stage: int) -> ExecutionTime:
        # Compute communication overheads conditionally
        if pipeline_stage == self._replica_config.num_pipeline_stages - 1:
            pipeline_parallel_communication_time = 0
        else:
            pipeline_parallel_communication_time = (
                self._get_pipeline_parallel_communication_time(batch)
            )

        if self._replica_config.tensor_parallel_size == 1:
            tensor_parallel_communication_time = 0
        else:
            tensor_parallel_communication_time = (
                self._get_tensor_parallel_communication_time(batch)
            )

        return ExecutionTime(
            self._num_layers_per_pipeline_stage,
            self._get_attention_rope_execution_time(batch),
            self._get_attention_kv_cache_save_execution_time(batch),
            self._get_attention_decode_execution_time(batch),
            self._get_attention_prefill_execution_time(batch),
            self._get_attention_layer_pre_proj_execution_time(batch),
            self._get_attention_layer_post_proj_execution_time(batch),
            self._get_mlp_layer_up_proj_execution_time(batch),
            self._get_mlp_layer_down_proj_execution_time(batch),
            self._get_mlp_layer_act_execution_time(batch),
            self._get_attn_norm_layer_act_execution_time(batch),
            self._get_mlp_norm_layer_act_execution_time(batch),
            self._get_add_layer_act_execution_time(batch),
            tensor_parallel_communication_time,
            pipeline_parallel_communication_time,
            self._get_schedule_time(batch),
            self._get_sampler_e2e_time(batch),
            self._get_prepare_inputs_e2e_time(batch),
            self._get_process_model_outputs_time(batch),
            self._get_ray_comm_time(batch),
        )

The 19-Component Execution Time Breakdown

Every call to get_execution_time() produces an ExecutionTime entity that decomposes a single pipeline stage's execution into fine-grained sub-operations. These are aggregated to compute the total time:

Component Category Description
attention_ropeAttentionRotary Position Embedding computation
attention_kv_cache_saveAttentionWriting K/V to cache memory
attention_decodeAttentionDecode attention (batch_size x kv_cache_size)
attention_prefillAttentionPrefill attention (kv_cache x chunk_size^2)
attention_pre_projAttentionQKV projection (linear layer)
attention_post_projAttentionOutput projection (linear layer)
mlp_up_projMLPUp projection / gate projection
mlp_down_projMLPDown projection
mlp_actMLPActivation function (SiLU, GELU)
attn_normNormInput layer normalization (RMSNorm / LayerNorm)
mlp_normNormPost-attention normalization
addResidualResidual connection addition
tensor_parallel_commCommunicationAll-reduce across tensor parallel workers
pipeline_parallel_commCommunicationSend/recv between pipeline stages
scheduleCPU OverheadScheduler CPU time
sampler_e2eCPU OverheadToken sampling end-to-end
prepare_inputs_e2eCPU OverheadInput preparation overhead
process_model_outputsCPU OverheadOutput processing overhead
ray_comm_timeCPU OverheadRay distributed communication

ExecutionTime Entity: Time Aggregation

# vidur/entities/execution_time.py
class ExecutionTime(BaseEntity):
    def _get_block_execution_time(self) -> float:
        return (
            self._get_attention_layer_execution_time()  # pre_proj + post_proj + rope + kv_save + decode + prefill + TP_comm + norm
            + self._get_mlp_layer_execution_time()      # up_proj + down_proj + act + TP_comm + norm
            + self._add_time                            # residual
        )

    @property
    def model_time(self) -> float:
        block_execution_time = self._get_block_execution_time()
        pipeline_stage_execution_time = (
            block_execution_time * self._num_layers_per_pipeline_stage
        )
        # return in seconds (internal times are ms)
        return (
            pipeline_stage_execution_time + self.pipeline_parallel_communication_time
        ) * 1e-3

    @property
    def total_time(self) -> float:
        # model GPU time + CPU overhead (schedule, sampler, ray, etc.)
        return self.model_time + self._get_cpu_overhead() * 1e-3
Key Insight: The total execution time per pipeline stage is: (block_time x num_layers) + pipeline_comm + cpu_overhead. Each block is attention_layer + mlp_layer + residual. Communication costs (TP all-reduce, PP send/recv) are added only when parallelism > 1.

The sklearn Prediction Pipeline

SklearnExecutionTimePredictor is the workhorse implementation. It trains ML models on profiled data collected from real GPUs, then uses pre-computed prediction tables during simulation. The pipeline has three phases: data loading + feature engineering, model training with GridSearchCV, and full-range prediction caching.

PHASE 1: DATA compute_input.csv attention_input.csv all_reduce_input.csv send_recv_input.csv cpu_overhead_input.csv Filter by: n_head, n_embd, n_kv_head, vocab_size, TP degree, block_size PHASE 2: TRAIN Feature Engineering is_decode = (prefill_chunk_size == 0) prefill_chunk_size_squared = chunk^2 num_tokens = max(prefill_chunk, batch_size) GridSearchCV Training scorer = MAPE (mean abs % error) cv = k_fold_cv_splits (default: 5) 15+ sub-models trained independently Model Cache (pickle + MD5 hash) InterProcessReaderWriterLock for safety PHASE 3: PREDICT Pre-compute All Predictions compute: tokens 1..max_tokens -> Dict[(tokens,), time] attention: (batch_size, kv_cache) grid -> Dict[tuple, time] Lookup During Simulation self._predictions["mlp_up_proj"][(num_tokens,)] self._predictions["attn_decode"][(batch_sz, kv_cache)] 15+ Trained Sub-Models attn_pre_proj attn_post_proj attn_prefill attn_decode attn_rope mlp_up_proj mlp_down_proj mlp_act all_reduce send_recv schedule sampler_e2e prepare_inputs process_outputs ray_comm

Data Loading and Filtering

Five input CSV files contain profiled data from real GPU runs. Each is filtered to match the exact model architecture and parallelism configuration being simulated. File paths use template substitution with {DEVICE}, {MODEL}, and {NETWORK_DEVICE} placeholders:

# vidur/execution_time_predictor/sklearn_execution_time_predictor.py

def _get_input_files(self):
    input_files = [
        self._config.compute_input_file,
        self._config.attention_input_file,
        self._config.all_reduce_input_file,
        self._config.send_recv_input_file,
        self._config.cpu_overhead_input_file,
    ]
    for i in range(len(input_files)):
        input_files[i] = (
            input_files[i]
            .replace("{DEVICE}", self._replica_config.device)
            .replace("{MODEL}", self._model_config.get_name())
            .replace("{NETWORK_DEVICE}", self._replica_config.network_device)
        )
    return tuple(input_files)

def _load_compute_df(self, file_path):
    df = self._read_input_file(file_path)
    df = df.drop_duplicates()
    # Filter to exact model architecture
    df = df[
        (df["n_head"] == self._model_config.num_q_heads)
        & (df["n_kv_head"] == self._model_config.num_kv_heads)
        & (df["n_embd"] == self._model_config.embedding_dim)
        & (df["n_expanded_embd"] == self._model_config.mlp_hidden_dim)
        & (df["use_gated_mlp"] == self._model_config.use_gated_mlp)
        & (df["vocab_size"] == self._model_config.vocab_size)
        & (df["num_tensor_parallel_workers"] == self._replica_config.tensor_parallel_size)
    ]
    return df

Feature Engineering

Attention data requires additional derived features. The is_decode flag separates prefill and decode operations, while prefill_chunk_size_squared captures the quadratic scaling of self-attention. For communication data, byte sizes are converted to token counts using the embedding dimension:

def _get_attention_df_with_derived_features(self, df):
    df_with_derived_features = df.copy()
    df_with_derived_features["num_tokens"] = df_with_derived_features[
        ["prefill_chunk_size", "batch_size"]
    ].max(axis=1)
    df_with_derived_features["is_decode"] = (
        df_with_derived_features["prefill_chunk_size"] == 0
    )
    df_with_derived_features["prefill_chunk_size_squared"] = (
        df_with_derived_features["prefill_chunk_size"] ** 2
    )
    return df_with_derived_features

def _get_all_reduce_df_with_derived_features(self, df):
    df_with_derived_features = df.copy()
    # convert bytes to num tokens: each token = 2 * embedding_dim bytes
    df_with_derived_features["num_tokens"] = (
        df_with_derived_features["size"] / self._model_config.embedding_dim / 2
    )
    return df_with_derived_features

Model Training with GridSearchCV

Each sub-operation gets its own model trained via scikit-learn's GridSearchCV. The scorer is MAPE (Mean Absolute Percentage Error), and all data is used for training -- there is no train/test split because the goal is interpolation within the known profiling domain, not extrapolation:

def _train_model(self, model_name, df, feature_cols, target_col):
    if len(df) == 0:
        raise Exception(f"Training data for model {model_name} is empty")

    model_hash = self._get_model_hash(model_name, df)
    cached_model = self._load_model_from_cache(model_name, model_hash)
    if cached_model:
        return cached_model

    model = self._get_estimator()         # abstract: LinearRegression or RandomForest
    grid_search_params = self._get_grid_search_params()  # abstract: hyperparameter grid

    cv = min(self._config.k_fold_cv_splits, len(df)) if len(df) < self._config.k_fold_cv_splits else self._config.k_fold_cv_splits

    grid_search = GridSearchCV(
        estimator=model,
        param_grid=grid_search_params,
        scoring=self._get_scorer(),  # MAPE scorer
        cv=cv,
        n_jobs=self._config.num_training_job_threads,
    )

    # No train/test split -- we want to predict within the profiled domain
    X, y = df[feature_cols], df[target_col]
    grid_search.fit(X, y)

    self._store_model_in_cache(model_name, model_hash, grid_search.best_estimator_)
    return grid_search.best_estimator_
Custom MAPE Scorer: The mean_absolute_percentage_error method handles the zero-true-value edge case: if the true value is 0 and prediction is also 0, error is 0; if true is 0 but prediction is non-zero, error is 100%. This prevents division-by-zero crashes during cross-validation.

What Gets Trained: The Complete Model Inventory

def _train_models(self):
    models = self._train_compute_models()       # 9-10 models
    models.update(self._train_cpu_overhead_models())  # 5 models
    models.update(self._train_attention_layer_models())  # 2 models
    return models

# Compute models: features = [num_tokens], target = time_stats.{op}.median
# attn_pre_proj, attn_post_proj, mlp_up_proj, mlp_down_proj, mlp_act,
# input_layernorm, post_attention_layernorm, attn_rope, add, attn_kv_cache_save

# Attention models (separate prefill and decode):
# attn_prefill: features=[kv_cache_size, prefill_chunk_size_squared]
# attn_decode:  features=[batch_size, kv_cache_size]

# CPU overhead models: features = [batch_size]
# schedule, sampler_e2e, prepare_inputs_e2e, process_model_outputs, ray_comm_time

# Communication models (conditional):
# all_reduce: features=[num_tokens]  -- only if TP > 1
# send_recv:  features=[num_tokens]  -- only if PP > 1

Prediction Caching: Pre-computing the Entire Domain

After training, Vidur pre-computes predictions for every possible input combination and stores them as dictionary lookups. This means simulation-time lookups are O(1) hash table accesses, not model inference calls:

def _predict_for_attention_layer_models(self):
    # Decode grid: batch_size x kv_cache_size
    decode_batch_size_range = np.arange(1, self._config.prediction_max_batch_size + 1)
    decode_kv_cache_size_range = np.arange(
        0,
        self._config.prediction_max_tokens_per_request + 1,
        self._config.kv_cache_prediction_granularity,
    )
    # Prefill grid: kv_cache_size x prefill_chunk_size
    prefill_kv_cache_size_range = np.arange(
        0,
        self._config.prediction_max_tokens_per_request + 1,
        self._config.kv_cache_prediction_granularity,
    )
    prefill_prefill_chunk_size_range = np.arange(
        1, self._config.prediction_max_prefill_chunk_size + 1
    )
    # Cartesian product -> all combinations
    # model.predict(X) -> Dict[(tuple_of_features,), predicted_time]

Simulation-Time Lookups

During simulation, each abstract method implementation is a single dictionary lookup. Prefill attention uses a geometric aggregation of per-request chunk sizes, while decode attention uses the mean KV-cache size across the batch, rounded to the prediction granularity:

# Simple compute lookups: key is (total_num_tokens_rounded,)
def _get_mlp_layer_up_proj_execution_time(self, batch):
    return self._predictions["mlp_up_proj"][(batch._total_num_tokens_rounded,)]

# Attention decode: features = (batch_size, avg_kv_cache_size)
def _get_attention_decode_execution_time(self, batch):
    decode_batch_size, decode_avg_kv_cache_size = self._get_batch_decode_attention_params(batch)
    if decode_batch_size == 0:
        return 0
    return self._predictions["attn_decode"][
        (decode_batch_size, decode_avg_kv_cache_size)
    ] * (1 + self._attention_decode_batching_overhead_fraction * int(decode_batch_size > 1))

# Attention prefill: aggregate chunk sizes geometrically
def _get_attention_prefill_execution_time(self, batch):
    prefill_params = self._get_batch_prefill_attention_params(batch)
    if len(prefill_params) == 0:
        return 0
    kv_cache_sizes, prefill_chunk_sizes = zip(*prefill_params)
    agg_kv_cache_size = sum(kv_cache_sizes)
    agg_prefill_chunk_size = sum([x**2 for x in prefill_chunk_sizes]) ** 0.5
    return self._predictions["attn_prefill"][
        (agg_kv_cache_size, round(agg_prefill_chunk_size) ** 2)
    ] * (1 + self._attention_prefill_batching_overhead_fraction * int(len(prefill_params) > 1))
GQA Overhead Modeling: For Grouped Query Attention models (where num_q_heads > num_kv_heads), a configurable batching_overhead_fraction is applied to both prefill and decode attention when batch size exceeds 1. This captures the additional overhead of batching with GQA compared to MHA.

Linear Regression vs Random Forest

Both concrete predictor implementations inherit from SklearnExecutionTimePredictor and only need to override two methods: _get_estimator() and _get_grid_search_params(). The training pipeline, caching, and prediction logic are entirely shared.

LinearRegressionExecutionTimePredictor

# linear_regression_execution_time_predictor.py
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures

class LinearRegressionExecutionTimePredictor(
    SklearnExecutionTimePredictor):

    def _get_grid_search_params(self):
        return {
            "polynomialfeatures__degree":
                self._config.polynomial_degree,
            "polynomialfeatures__include_bias":
                self._config.polynomial_include_bias,
            "polynomialfeatures__interaction_only":
                self._config.polynomial_interaction_only,
            "linearregression__fit_intercept":
                self._config.fit_intercept,
        }

    def _get_estimator(self):
        return make_pipeline(
            PolynomialFeatures(),
            LinearRegression()
        )

Uses a polynomial feature + linear regression pipeline. GridSearchCV explores polynomial degree, bias inclusion, and interaction-only features. Suited for operations with known polynomial scaling (e.g., attention is O(n^2) in sequence length).

RandomForrestExecutionTimePredictor

# random_forrest_execution_time_predictor.py
from sklearn.ensemble import RandomForestRegressor

class RandomForrestExecutionTimePredictor(
    SklearnExecutionTimePredictor):

    def _get_grid_search_params(self):
        return {
            "n_estimators":
                self._config.num_estimators,
            "max_depth":
                self._config.max_depth,
            "min_samples_split":
                self._config.min_samples_split,
        }

    def _get_estimator(self):
        return RandomForestRegressor()



Uses a Random Forest ensemble. GridSearchCV tunes tree count, max depth, and min samples per split. Better for operations with non-polynomial or irregular scaling patterns. More robust to outliers in profiling data.

# vidur/execution_time_predictor/execution_time_predictor_registry.py
class ExecutionTimePredictorRegistry(BaseRegistry):
    @classmethod
    def get_key_from_str(cls, key_str):
        return ExecutionTimePredictorType.from_str(key_str)

ExecutionTimePredictorRegistry.register(
    ExecutionTimePredictorType.RANDOM_FORREST, RandomForrestExecutionTimePredictor
)
ExecutionTimePredictorRegistry.register(
    ExecutionTimePredictorType.LINEAR_REGRESSION, LinearRegressionExecutionTimePredictor
)

Hardware and Model Configurations

Device SKU Configurations

Each GPU type is modeled with two critical parameters: fp16_tflops (compute throughput) and total_memory_gb (VRAM capacity). These are used by the MFU calculator and memory management:

# vidur/config/device_sku_config.py
@dataclass
class BaseDeviceSKUConfig(BaseFixedConfig):
    fp16_tflops: int
    total_memory_gb: int

@dataclass
class A40DeviceSKUConfig(BaseDeviceSKUConfig):
    fp16_tflops: int = 150
    total_memory_gb: int = 45

@dataclass
class A100DeviceSKUConfig(BaseDeviceSKUConfig):
    fp16_tflops: int = 312
    total_memory_gb: int = 80

@dataclass
class H100DeviceSKUConfig(BaseDeviceSKUConfig):
    fp16_tflops: int = 1000
    total_memory_gb: int = 80
GPU FP16 TFLOPS Memory (GB) Architecture
A4015045Ampere
A10031280Ampere
H100100080Hopper

Node SKU Configurations

Nodes combine a device type with a device count. The num_devices_per_node parameter is critical for determining whether tensor/pipeline parallelism spans across nodes (affecting communication latency):

# vidur/config/node_sku_config.py
@dataclass
class A100PairwiseNvlinkNodeSKUConfig(BaseNodeSKUConfig):
    device_sku_type: DeviceSKUType = DeviceSKUType.A100
    num_devices_per_node: int = 4   # Pairwise NVLink: 4 GPUs

@dataclass
class A100DgxNodeSKUConfig(BaseNodeSKUConfig):
    device_sku_type: DeviceSKUType = DeviceSKUType.A100
    num_devices_per_node: int = 8   # Full DGX: 8 GPUs with NVSwitch

@dataclass
class H100DgxNodeSKUConfig(BaseNodeSKUConfig):
    device_sku_type: DeviceSKUType = DeviceSKUType.H100
    num_devices_per_node: int = 8   # H100 DGX: 8 GPUs with NVSwitch
Multi-Node Detection: The sklearn predictor uses num_workers > devices_per_node to detect multi-node setups. When multi-node, it selects devices_per_node=1 for send/recv profiling data (inter-node communication). For intra-node, it uses devices_per_node=2.

Model Architecture Configurations

Every LLM architecture is fully parameterized as a dataclass. The base class BaseModelConfig defines the complete set of architectural parameters that drive both execution time prediction and memory management:

# vidur/config/model_config.py
@dataclass
class BaseModelConfig(BaseFixedConfig):
    num_layers: int             # Total transformer layers
    num_q_heads: int            # Query attention heads
    num_kv_heads: int           # Key/Value heads (GQA when < num_q_heads)
    embedding_dim: int          # Hidden dimension
    mlp_hidden_dim: int         # Feed-forward intermediate dimension
    max_position_embeddings: int
    use_gated_mlp: bool         # LLaMA-style gated MLP vs vanilla
    use_bias: bool
    use_qkv_bias: bool
    activation: ActivationType  # SiLU, GELU, etc.
    norm: NormType              # RMS_NORM or LAYER_NORM
    post_attn_norm: bool        # Whether post-attention layernorm exists
    vocab_size: int
    rope_theta: float           # RoPE base frequency
    partial_rotary_factor: float = 1.0
    no_tensor_parallel: bool = False
Model Layers Q Heads KV Heads Embed Dim MLP Dim Vocab
Llama-2-7B32323240961100832768
Llama-2-70B8064881922867232768
Llama-3-8B32328409614336128256
Llama-3-70B80648819228672128256
CodeLlama-34B4864881922201632768
InternLM-20B604040512013824103168
InternLM2-20B4848861441638492544
Phi-232323225601024051200
Qwen-72B806464819224576152064
GQA Detection: Models like Llama-2-70B, Llama-3-8B, and InternLM2-20B use Grouped Query Attention (KV heads < Q heads). The predictor automatically detects this and applies batching overhead fractions. Phi-2 is unique: it uses LayerNorm instead of RMSNorm, GELU instead of SiLU, no gated MLP, and partial_rotary_factor=0.4.

The Discrete Event System

Vidur's simulation is driven by a priority queue of events. Each event has a timestamp, type, and unique ID. When processed, an event may generate new events that are pushed back into the queue. Time advances non-uniformly, jumping directly to the next event's timestamp.

BaseEvent: Priority and Ordering

# vidur/events/base_event.py
class BaseEvent(ABC):
    _id = 0

    def __init__(self, time: float, event_type: EventType):
        self._time = time
        self._id = BaseEvent.generate_id()
        self._event_type = event_type
        self._priority_number = self._get_priority_number()

    def _get_priority_number(self):
        return (self._time, self._id, self.event_type)

    def __lt__(self, other):
        # Three-level comparison: time -> event_type -> id
        if self._time == other._time:
            if self._event_type == other._event_type:
                return self._id < other._id
            return self._event_type < other._event_type
        else:
            return self._time < other._time

    @abstractmethod
    def handle_event(self, scheduler, metrics_store) -> List["BaseEvent"]:
        pass

Event Types and Their Priority Order

The EventType enum defines the numeric ordering that breaks ties when multiple events occur at the same timestamp. Lower numbers have higher priority:

# vidur/types/event_type.py
class EventType(BaseIntEnum):
    # At any given time step, call the schedule event at the last
    # to ensure that all the requests are processed
    BATCH_STAGE_ARRIVAL   = 1  # Highest priority -- data arriving at a stage
    REQUEST_ARRIVAL       = 2  # New request enters the system
    BATCH_STAGE_END       = 3  # GPU computation for a stage completes
    BATCH_END             = 4  # All stages of a batch complete
    GLOBAL_SCHEDULE       = 5  # Distribute requests to replicas
    REPLICA_SCHEDULE      = 6  # Schedule batches within a replica
    REPLICA_STAGE_SCHEDULE = 7  # Schedule work on a specific pipeline stage
EVENT LIFECYCLE: Request Arrival through Batch Completion RequestArrival scheduler.add_request() -> GlobalScheduleEvent GlobalSchedule scheduler.schedule() -> ReplicaScheduleEvent[] ReplicaSchedule replica.on_schedule() -> BatchStageArrival[] BatchStageArrival stage.add_batch() -> ReplicaStageSchedule ReplicaStageSchedule stage.on_schedule() -> BatchStageEnd(t+exec) BatchStageEnd stage_scheduler.on_stage_end() if last_stage: -> BatchEndEvent else: -> BatchStageArrival(stage+1) BatchEnd batch.on_batch_end() -> request completion tracking -> ReplicaScheduleEvent (check for new work) feedback: new requests may have arrived Time Advancement BatchStageEnd fires at: current_time + execution_time

Event Handlers: What Each Event Does

RequestArrivalEvent

# request_arrival_event.py
def handle_event(self, scheduler, metrics):
    scheduler.add_request(self._request)
    metrics.on_request_arrival(
        self.time, self._request)
    return [GlobalScheduleEvent(self.time)]

Adds the request to the global scheduler's queue and immediately triggers a GlobalScheduleEvent at the same timestamp to attempt dispatching.

GlobalScheduleEvent

# global_schedule_event.py
def handle_event(self, scheduler, metrics):
    self._request_mapping = scheduler.schedule()
    for replica_id, request in self._request_mapping:
        self._replica_set.add(replica_id)
        scheduler.get_replica_scheduler(
            replica_id).add_request(request)
    return [
        ReplicaScheduleEvent(self.time, rid)
        for rid in self._replica_set
    ]

Invokes the global scheduler's dispatch logic (round-robin, load-balanced, etc.) and maps requests to replicas. Triggers ReplicaScheduleEvent for each affected replica.

ReplicaScheduleEvent

# replica_schedule_event.py
def handle_event(self, scheduler, metrics):
    replica_scheduler = scheduler.get_replica_scheduler(
        self._replica_id)
    self._batches = replica_scheduler.on_schedule()
    if not self._batches:
        return []
    for batch in self._batches:
        batch.on_schedule(self.time)
    return [
        BatchStageArrivalEvent(
            self.time, self._replica_id,
            0,  # stage_id = 0 (first stage)
            batch)
        for batch in self._batches
    ]

Calls the replica's scheduler (ORCA, vLLM, sarathi, etc.) to form batches. Each batch starts at pipeline stage 0 via BatchStageArrivalEvent.

ReplicaStageScheduleEvent

# replica_stage_schedule_event.py
def handle_event(self, scheduler, metrics):
    stage_scheduler = scheduler._replica_schedulers[
        self._replica_id
    ]._replica_stage_schedulers[self._stage_id]

    self._batch, self._batch_stage, execution_time = (
        stage_scheduler.on_schedule())
    if not (self._batch and self._batch_stage):
        return []

    # THIS IS WHERE TIME ADVANCES:
    return [
        BatchStageEndEvent(
            self.time + self._batch_stage.execution_time,
            self._replica_id, self._stage_id,
            stage_scheduler.is_last_stage,
            self._batch, self._batch_stage)
    ]

The critical event that advances simulation time. The stage scheduler calls the execution time predictor, and a BatchStageEndEvent is scheduled at current_time + predicted_execution_time.

BatchStageEndEvent

# batch_stage_end_event.py
def handle_event(self, scheduler, metrics):
    scheduler.get_replica_stage_scheduler(
        self._replica_id, self._stage_id
    ).on_stage_end()
    self._batch_stage.on_stage_end(self.time)

    next_events = [
        ReplicaStageScheduleEvent(
            self.time, self._replica_id, self._stage_id)
    ]

    if self._is_last_stage:
        return next_events + [
            BatchEndEvent(self.time, self._replica_id,
                          self._batch)]
    return next_events + [
        BatchStageArrivalEvent(
            self.time, self._replica_id,
            self._stage_id + 1, self._batch)]

When a pipeline stage finishes, it either advances to the next stage (BatchStageArrival(stage+1)) or, if this was the last stage, completes the batch (BatchEndEvent). It also re-schedules the current stage for new work.

BatchEndEvent

# batch_end_event.py
def handle_event(self, scheduler, metrics):
    self._batch.on_batch_end(self.time)
    replica_scheduler = scheduler.get_replica_scheduler(
        self._replica_id)
    replica_scheduler.on_batch_end(self._batch)

    memory_usage = replica_scheduler.memory_usage_percent
    metrics.on_batch_end(
        self.time, self._batch,
        self._replica_id, memory_usage)

    return [ReplicaScheduleEvent(
        self.time, self._replica_id)]

Finalizes the batch: updates request completion state, frees memory, records metrics. Triggers a new ReplicaScheduleEvent to check if more batches can be formed from queued requests.

The Simulator: Discrete Event Loop

The Simulator class orchestrates everything. Its run() method is a clean, minimal event loop that processes events until the queue empties or a time limit is reached:

# vidur/simulator.py
class Simulator:
    def __init__(self, config: SimulationConfig):
        self._time = 0
        self._terminate = False
        self._time_limit = config.time_limit or float("inf")
        self._event_queue = []                 # Python heapq (min-heap)

        self._cluster = Cluster(config.cluster_config, ...)
        self._metric_store = MetricsStore(config)
        self._request_generator = RequestGeneratorRegistry.get(
            config.request_generator_config.get_type(),
            config.request_generator_config,
        )
        self._scheduler = GlobalSchedulerRegistry.get(
            config.cluster_config.global_scheduler_config.get_type(),
            config, self._cluster.replicas,
        )
        self._init_event_queue()

    def _init_event_queue(self):
        # Generate ALL requests upfront, add as arrival events
        requests = self._request_generator.generate()
        for request in requests:
            self._add_event(RequestArrivalEvent(request.arrived_at, request))

    def run(self):
        while self._event_queue and not self._terminate:
            _, event = heapq.heappop(self._event_queue)  # Pop lowest priority number
            self._set_time(event._time)                  # TIME JUMPS to event time
            new_events = event.handle_event(             # Process event
                self._scheduler, self._metric_store)
            self._add_events(new_events)                 # Push new events to heap

    def _add_event(self, event):
        heapq.heappush(self._event_queue, (event._priority_number, event))

    def _set_time(self, time):
        self._time = time
        if self._time > self._time_limit:
            self._terminate = True
How Time Advances: Unlike a tick-based simulator that advances by fixed timesteps, Vidur's event loop jumps directly to the next event's timestamp via self._set_time(event._time). This is the hallmark of discrete event simulation: there is no computation in "between" events. When ReplicaStageScheduleEvent creates a BatchStageEndEvent at current_time + execution_time, that predicted execution time (from the sklearn model) determines when simulation time next advances significantly.

Initialization: All Requests Pre-Generated

A key design choice: all requests are generated upfront in _init_event_queue() and added as RequestArrivalEvents. This means the event queue starts populated with all future arrivals. The request generator runs once before the simulation loop begins.

Output: Traces and Chrome Profiling

def _write_output(self):
    self._metric_store.plot()                          # Generate all metric plots
    if self._config.metrics_config.write_json_trace:
        self._write_event_trace()                      # JSON event log
    if self._config.metrics_config.enable_chrome_trace:
        self._write_chrome_trace()                     # Chrome DevTools format

The simulator optionally writes Chrome trace files that can be loaded in chrome://tracing for visual timeline analysis. This is achieved through each event's to_chrome_trace() method, with BatchStageEndEvent being the primary source of trace entries.

Request Generation: Synthetic and Trace-Based

Vidur supports two fundamental request generation strategies: synthetic (parameterized distributions) and trace replay (real production data). The synthetic generator composes independent interval and length generators via the registry pattern.

REQUEST GENERATION ARCHITECTURE RequestGeneratorRegistry SyntheticRequestGenerator Composes interval + length generators Poisson -log(1-r)/qps Gamma shape=1/cv^2 Uniform [min, max] Zipf theta, scramble Fixed const tokens Trace CSV lengths Intervals Lengths Static interval = 0 TraceReplayRequestGenerator Reads trace CSV with arrival times + token counts Trace CSV Format arrived_at, num_prefill_tokens, num_decode_tokens Configurable: prefill_scale_factor, decode_scale_factor time_scale_factor (change QPS), max_tokens (clip) Ensures: prefill >= 1, decode >= 1, total <= max

Synthetic Request Generator

# vidur/request_generator/synthetic_request_generator.py
class SyntheticRequestGenerator(BaseRequestGenerator):
    def __init__(self, config):
        self.request_length_generator = RequestLengthGeneratorRegistry.get(
            config.length_generator_config.get_type(),
            config.length_generator_config,
        )
        self.request_interval_generator = RequestIntervalGeneratorRegistry.get(
            config.interval_generator_config.get_type(),
            config.interval_generator_config,
        )

    def _generate_next_request(self, last_arrived_at):
        inter_request_time = self.request_interval_generator.get_next_inter_request_time()
        arrived_at = last_arrived_at + inter_request_time
        prefill_tokens, decode_tokens = self.request_length_generator.get_next_num_tokens()
        return Request(
            arrived_at=arrived_at,
            num_prefill_tokens=int(prefill_tokens),
            num_decode_tokens=int(decode_tokens),
        )

    def _generate_requests(self):
        requests = []
        current_time = 0
        # Duration-based, count-based, or trace-exhaustion generation
        if self.config.duration is not None:
            while current_time < self.config.duration:
                request = self._generate_next_request(current_time)
                current_time = request.arrived_at
                requests.append(request)
        elif self.config.num_requests is not None:
            for _ in range(self.config.num_requests):
                request = self._generate_next_request(current_time)
                current_time = request.arrived_at
                requests.append(request)
        return requests

Interval Generators: Poisson and Gamma

Poisson (Exponential Inter-arrivals)

# poisson_request_interval_generator.py
class PoissonRequestIntervalGenerator:
    def __init__(self, config):
        self.qps = config.qps
        self.std = 1.0 / self.qps
        self.max_interval = self.std * 3.0

    def get_next_inter_request_time(self):
        next_interval = (
            -math.log(1.0 - random.random())
            / self.qps)
        # Clamp to 3 sigma to avoid extreme gaps
        return min(next_interval, self.max_interval)

Models memoryless arrivals via inverse CDF of the exponential distribution: -ln(1-U)/lambda. Intervals are clamped to 3 standard deviations to prevent extremely long gaps.

Gamma (Burstier Traffic)

# gamma_request_interval_generator.py
class GammaRequestIntervalGenerator:
    def __init__(self, config):
        cv = config.cv  # coefficient of variation
        self.qps = config.qps
        self.gamma_shape = 1.0 / (cv ** 2)

    def get_next_inter_request_time(self):
        gamma_scale = 1.0 / (
            self.qps * self.gamma_shape)
        return gamma.rvs(
            self.gamma_shape, scale=gamma_scale)

The Gamma distribution generalizes exponential arrivals with a coefficient of variation (CV) parameter. CV=1 equals Poisson; CV>1 produces burstier traffic; CV<1 produces more regular arrivals. The shape parameter is 1/CV^2.

Length Generators

Uniform

def get_next_num_tokens(self):
    total = random.uniform(
        self.config.min_tokens,
        self.config.max_tokens)
    decode = math.ceil(
        total / (1 + self.config.prefill_to_decode_ratio))
    prefill = total - decode
    return prefill, decode

Zipf

def get_next_num_tokens(self):
    total = self.zipf_generator.next()
    decode = total / (
        1 + self.config.prefill_to_decode_ratio)
    prefill = total - decode
    return prefill, decode

Trace Replay Request Generator

# vidur/request_generator/trace_replay_request_generator.py
class TraceReplayRequestGenerator(BaseRequestGenerator):
    def __init__(self, config):
        self.trace_df = pd.read_csv(config.trace_file)

        # Scale tokens
        self.trace_df["num_prefill_tokens"] *= config.prefill_scale_factor
        self.trace_df["num_decode_tokens"] *= config.decode_scale_factor

        # Enforce constraints: integers, minimum 1, total <= max
        self.trace_df["num_prefill_tokens"] = self.trace_df["num_prefill_tokens"].astype(int).clip(lower=1)
        self.trace_df["num_decode_tokens"] = self.trace_df["num_decode_tokens"].astype(int).clip(lower=1)

        # If total exceeds max, reduce prefill tokens
        total_tokens = self.trace_df["num_prefill_tokens"] + self.trace_df["num_decode_tokens"]
        diff_tokens = (total_tokens - config.max_tokens).clip(lower=0)
        self.trace_df["num_prefill_tokens"] -= diff_tokens

        # Rescale arrival times to change effective QPS
        self.trace_df["arrived_at"] *= config.time_scale_factor

    def generate_requests(self):
        return [
            Request(arrived_at=row["arrived_at"],
                    num_prefill_tokens=row["num_prefill_tokens"],
                    num_decode_tokens=row["num_decode_tokens"])
            for _, row in self.trace_df.iterrows()
        ]
Trace Flexibility: The trace generator supports three scaling knobs: prefill_scale_factor and decode_scale_factor for adjusting token lengths, and time_scale_factor for compressing or stretching the arrival pattern. A time_scale_factor of 0.5 doubles the effective QPS. Token counts are always clipped to ensure at least 1 prefill and 1 decode token, and total never exceeds max_tokens.

Metrics: What Gets Measured

The MetricsStore is a comprehensive measurement system that captures every aspect of simulation behavior. It uses two core data structures: DataSeries for indexed (x, y) data and CDFSketch (backed by DDSketch) for streaming quantile estimation.

CDFSketch: Streaming Quantile Estimation

# vidur/metrics/cdf_sketch.py
class CDFSketch:
    def __init__(self, metric_name, save_table_to_wandb=True, save_plots=True):
        self._sketch = DDSketch(relative_accuracy=0.001)  # 0.1% accuracy
        self._metric_name = metric_name
        self._last_data = 0

    def put(self, data: float):
        self._last_data = data
        self._sketch.add(data)

    def put_delta(self, delta: float):
        # Incremental update: add to last value
        data = self._last_data + delta
        self.put(data)

    def _to_df(self):
        # Generate CDF at 1% intervals for plotting
        quantiles = np.linspace(0, 1, 101)
        quantile_values = [self._sketch.get_quantile_value(q) for q in quantiles]
        return pd.DataFrame({"cdf": quantiles, self._metric_name: quantile_values})

    def print_distribution_stats(self, plot_name):
        # Reports: min, max, mean, p25, p50, p75, p95, p99, p99.9, count, sum
        ...
Why DDSketch? DDSketch is a space-efficient streaming quantile estimation algorithm with provable relative accuracy guarantees. With relative_accuracy=0.001, any quantile query (p50, p99, p99.9) is guaranteed to be within 0.1% of the true value. This is far more memory-efficient than storing all values, especially when tracking millions of per-token metrics.

Metric Categories

Category Storage Metrics
Request Time Distributions DataSeries E2E time, execution time, model execution time, scheduling delay, preemption time, prefill time, decode time (all with normalized variants)
Request Histograms DataSeries Inter-arrival delay, total tokens, prefill tokens, decode tokens, P/D ratio, num restarts
Token Metrics CDFSketch Decode token execution+preemption time (TPOT-equivalent)
Batch Count Distributions CDFSketch Batch num tokens, prefill tokens, decode tokens, batch size
Batch Time Distributions CDFSketch Batch execution time
GPU Operation Metrics CDFSketch Per-operation breakdown: MLP up/down/act, attention pre/post proj, prefill, decode, KV save, RoPE, norms, all-reduce, pipeline send/recv
CPU Operation Metrics CDFSketch Schedule, sampler E2E, prepare inputs, model execution E2E, process outputs, Ray comm
Utilization Metrics SeriesAverageMeter Per-replica memory usage, per-replica-stage busy time %, per-replica-stage MFU
Completion Time Series DataSeries Request arrivals/completions over time, prefill completions, decode completions

How Metrics Are Collected

Metrics are collected via event-driven callbacks. The @if_write_metrics decorator ensures that metric collection can be disabled for performance. Key collection points:

# Callback chain through event handlers:

# RequestArrivalEvent -> metrics.on_request_arrival(time, request)
#   Records: arrival count, token histograms, inter-arrival delay

# ReplicaStageScheduleEvent -> metrics.on_replica_stage_schedule(...)
#   Records: ALL per-operation execution times (the 19-component breakdown)
#   Also: busy_time, MFU per replica/stage

# BatchStageEndEvent -> metrics.on_batch_stage_end(...)
#   Records: busy_time=0, MFU=0 (stage becomes idle)

# BatchEndEvent -> metrics.on_batch_end(time, batch, ...)
#   Records: batch execution time, token counts, memory usage
#   For completed requests: E2E time, scheduling delay, preemption time
#   For each token: per-token execution time (TPOT tracking)

Output and Visualization

The MetricsStore produces five categories of output, all stored under output_dir/plots/:

All plots are generated with Plotly Express and optionally logged to Weights & Biases for experiment tracking. CSV files are saved alongside plots for programmatic analysis.

Putting It All Together

The complete flow from configuration to results follows a clear pipeline. Understanding how these pieces connect is essential for extending Vidur or interpreting its outputs.

Complete Simulation Lifecycle

  1. Configuration: SimulationConfig aggregates model config (Llama-2-70B), device config (A100), node config (DGX 8-GPU), scheduler config (Orca/vLLM), and predictor config (LinearRegression).
  2. Predictor Training: The sklearn predictor loads profiling CSVs, trains 15+ small ML models via GridSearchCV, and pre-computes all possible prediction lookups into dictionaries.
  3. Request Generation: Poisson/Gamma/Trace generators produce a list of Request objects with arrival times and token counts.
  4. Event Queue Init: Each request becomes a RequestArrivalEvent in the heap.
  5. Simulation Loop: Pop event, advance time, handle event (creating new events), repeat. The predictor provides execution times; the event system handles state transitions.
  6. Metrics Collection: CDFSketch and DataSeries collect per-request, per-batch, per-token, and per-operation metrics throughout the simulation.
  7. Output: CDF plots, bar charts, time series, CSV exports, optional WandB logging, and Chrome traces.

Key Design Decisions

Pre-computed Prediction Tables

Rather than calling model.predict() during simulation, all predictions are pre-computed as Dict[(features_tuple), time] lookups. This trades memory for speed: simulation-time prediction is a hash table access, not a model inference call. The cache system uses MD5 hashing of the configuration + data to avoid retraining across runs.

Event Priority Ordering

Events at the same timestamp are ordered by type: arrivals before completions before scheduling. This ensures that when a batch finishes and new requests arrive simultaneously, the system state is correctly updated before scheduling decisions are made. The three-level ordering (time, event_type, id) provides deterministic simulation.

Attention Aggregation Strategy

Prefill attention uses geometric aggregation of chunk sizes (sqrt(sum(x^2))) to handle batched prefill. Decode attention uses arithmetic mean of KV-cache sizes. Both are rounded to the kv_cache_prediction_granularity to match the pre-computed prediction grid. This balances accuracy with memory usage for the prediction tables.

Communication Overhead Modeling

Tensor parallel communication adds a fixed NCCL launch overhead plus a per-device skew term: prediction + nccl_cpu_launch_overhead_ms + nccl_cpu_skew_overhead_per_device_ms * TP^1.25. Pipeline parallel communication uses profiled send/recv data with multi-node awareness. The TP=1 and last-stage optimizations skip communication entirely.

Source File Reference

Component File Key Class/Function
Base Predictorexecution_time_predictor/base_execution_time_predictor.pyBaseExecutionTimePredictor.get_execution_time()
sklearn Pipelineexecution_time_predictor/sklearn_execution_time_predictor.pySklearnExecutionTimePredictor._train_models()
Linear Regressionexecution_time_predictor/linear_regression_execution_time_predictor.pyLinearRegressionExecutionTimePredictor
Random Forestexecution_time_predictor/random_forrest_execution_time_predictor.pyRandomForrestExecutionTimePredictor
Execution Time Entityentities/execution_time.pyExecutionTime.model_time, .total_time
Event Baseevents/base_event.pyBaseEvent.__lt__(), .handle_event()
Event Typestypes/event_type.pyEventType enum (1-7 priority)
Simulatorsimulator.pySimulator.run() event loop
Synthetic Generatorrequest_generator/synthetic_request_generator.pySyntheticRequestGenerator._generate_requests()
Trace Generatorrequest_generator/trace_replay_request_generator.pyTraceReplayRequestGenerator.generate_requests()
Poisson Intervalsrequest_generator/poisson_request_interval_generator.py-log(1-U)/qps
Gamma Intervalsrequest_generator/gamma_request_interval_generator.pyshape=1/cv^2
CDF Sketchmetrics/cdf_sketch.pyCDFSketch(DDSketch(0.001))
Metrics Storemetrics/metrics_store.pyMetricsStore.on_batch_end()
Device Configconfig/device_sku_config.pyA100: 312 TFLOPS, 80GB
Model Configconfig/model_config.pyLlama2_70BModelConfig
Node Configconfig/node_sku_config.pyA100DgxNodeSKUConfig: 8 GPUs