inference-x/tools/simulate_router.py
Salka Elmadani ec36668cf5 Inference-X v1.0 — Universal AI Inference Engine
Better output from the same model. Fused computation, adaptive precision,
surgical expert loading. 305 KB, 19 backends, zero dependencies.

https://inference-x.com
2026-02-23 07:10:47 +00:00

330 lines
12 KiB
Python
Executable File

# ═══════════════════════════════════════════════════════════════════════════════
# INFERENCEX — Router Simulation Tool
# Copyright (C) 2025-2026 Salka Elmadani. All rights reserved.
# Licensed under the Business Source License 1.1 (BSL-1.1)
# See LICENSE file for full terms. See LICENSE for terms.
#
# NOTICE: This file is part of InferenceX by Salka Elmadani.
# Commercial use by entities with revenue >= $1M USD requires a license.
# Contact: Elmadani.SALKA@proton.me
# ═══════════════════════════════════════════════════════════════════════════════
"""
IX-PROFILER | Router Simulation
=====================================
Read F32 router weights directly from GGUF, simulate routing
with random embeddings, profile which experts get selected.
NO model loading. NO inference. Just math on the gates.
~630MB RAM, runs in minutes.
Copyright (C) 2025-2026 Salka Elmadani — Morocco
"""
import struct
import numpy as np
import sys
from pathlib import Path
GGUF_MAGIC = 0x46554747
GGML_TYPE_F32 = 0
def read_string(f):
length = struct.unpack('<Q', f.read(8))[0]
return f.read(length).decode('utf-8', errors='replace')
def read_value(f, vtype):
readers = {
0: lambda: struct.unpack('<B', f.read(1))[0],
1: lambda: struct.unpack('<b', f.read(1))[0],
2: lambda: struct.unpack('<H', f.read(2))[0],
3: lambda: struct.unpack('<h', f.read(2))[0],
4: lambda: struct.unpack('<I', f.read(4))[0],
5: lambda: struct.unpack('<i', f.read(4))[0],
6: lambda: struct.unpack('<f', f.read(4))[0],
7: lambda: struct.unpack('<B', f.read(1))[0],
8: lambda: read_string(f),
10: lambda: struct.unpack('<Q', f.read(8))[0],
11: lambda: struct.unpack('<q', f.read(8))[0],
12: lambda: struct.unpack('<d', f.read(8))[0],
}
if vtype == 9: # array
arr_type = struct.unpack('<I', f.read(4))[0]
arr_len = struct.unpack('<Q', f.read(8))[0]
return [read_value(f, arr_type) for _ in range(arr_len)]
return readers.get(vtype, lambda: None)()
def scan_gguf(filepath):
"""Scan GGUF for metadata and tensor locations"""
metadata = {}
tensors = {}
with open(filepath, 'rb') as f:
magic = struct.unpack('<I', f.read(4))[0]
if magic != GGUF_MAGIC:
return metadata, tensors
version = struct.unpack('<I', f.read(4))[0]
n_tensors = struct.unpack('<Q', f.read(8))[0]
n_kv = struct.unpack('<Q', f.read(8))[0]
for _ in range(n_kv):
key = read_string(f)
vtype = struct.unpack('<I', f.read(4))[0]
value = read_value(f, vtype)
metadata[key] = value
for _ in range(n_tensors):
name = read_string(f)
n_dims = struct.unpack('<I', f.read(4))[0]
dims = [struct.unpack('<Q', f.read(8))[0] for _ in range(n_dims)]
dtype = struct.unpack('<I', f.read(4))[0]
offset = struct.unpack('<Q', f.read(8))[0]
tensors[name] = {
'dims': dims, 'dtype': dtype, 'offset': offset,
'file': filepath
}
data_start = f.tell()
alignment = metadata.get('general.alignment', 32)
data_start = (data_start + alignment - 1) // alignment * alignment
for t in tensors.values():
t['data_offset'] = data_start + t['offset']
return metadata, tensors
def load_f32_tensor(tensor_info):
"""Load an F32 tensor from GGUF"""
dims = tensor_info['dims']
n_elements = 1
for d in dims:
n_elements *= d
with open(tensor_info['file'], 'rb') as f:
f.seek(tensor_info['data_offset'])
data = np.frombuffer(f.read(n_elements * 4), dtype=np.float32)
return data.reshape(dims)
def simulate_routing(model_dir, n_simulations=10000, top_k=8):
"""Simulate MoE routing using gate weights and random embeddings"""
model_dir = Path(model_dir)
shards = sorted(model_dir.glob("*.gguf"))
print("=" * 60)
print(" IX-PROFILER | Router Simulation")
print("=" * 60)
# Scan all shards
all_meta = {}
all_tensors = {}
for shard in shards:
m, t = scan_gguf(str(shard))
all_meta.update(m)
all_tensors.update(t)
dim = all_meta.get('llama.embedding_length', 7168)
n_experts = all_meta.get('llama.expert_count', 384)
# Find router tensors (F32)
routers = {}
for name, info in all_tensors.items():
if 'ffn_gate_inp' in name and info['dtype'] == GGML_TYPE_F32:
# Extract layer number
parts = name.split('.')
for i, p in enumerate(parts):
if p == 'blk' and i + 1 < len(parts):
layer = int(parts[i + 1])
routers[layer] = info
break
n_layers = len(routers)
print(f"\nConfig: dim={dim}, experts={n_experts}, top_k={top_k}")
print(f"Router layers: {n_layers}")
print(f"Simulations: {n_simulations}")
print(f"\nLoading router weights (~{n_layers * dim * n_experts * 4 / 1e6:.0f} MB)...")
# Load all router weights
gate_weights = {}
for layer in sorted(routers.keys()):
gate_weights[layer] = load_f32_tensor(routers[layer])
# Shape should be [dim, n_experts] based on GGUF scan
print("Router weights loaded.\n")
# Generate random embeddings (simulating hidden states)
# Use Gaussian with std matching typical transformer activations
print(f"Simulating {n_simulations} tokens...")
np.random.seed(42) # Signature
embeddings = np.random.randn(n_simulations, dim).astype(np.float32) * 0.02
# Track activations
activation_counts = np.zeros((n_layers, n_experts), dtype=np.int64)
layers_sorted = sorted(gate_weights.keys())
for li, layer in enumerate(layers_sorted):
gate = gate_weights[layer] # [dim, n_experts]
# Routing scores: embeddings @ gate → [n_simulations, n_experts]
scores = embeddings @ gate # [n_sim, n_experts]
# Top-K selection per token
top_indices = np.argpartition(scores, -top_k, axis=1)[:, -top_k:]
# Count activations
for i in range(n_simulations):
for eid in top_indices[i]:
activation_counts[li][eid] += 1
if (li + 1) % 10 == 0:
print(f" Layer {layer} done ({li+1}/{n_layers})")
print("\n" + "=" * 60)
print(" RESULTS")
print("=" * 60)
# Per-layer analysis
output_lines = []
all_n90 = []
all_n95 = []
all_n99 = []
for li, layer in enumerate(layers_sorted):
counts = activation_counts[li]
total = counts.sum()
# Sort descending
sorted_idx = np.argsort(counts)[::-1]
sorted_counts = counts[sorted_idx]
cumsum = np.cumsum(sorted_counts)
n_active = np.sum(counts > 0)
n_dead = n_experts - n_active
# Thresholds
n90 = np.searchsorted(cumsum, total * 0.90) + 1
n95 = np.searchsorted(cumsum, total * 0.95) + 1
n99 = np.searchsorted(cumsum, total * 0.99) + 1
all_n90.append(n90)
all_n95.append(n95)
all_n99.append(n99)
top_pct = 100.0 * sorted_counts[0] / total if total > 0 else 0
line = (f"Layer {layer:2d}: {n_active:3d} active, {n_dead:3d} dead | "
f"90%={n90:3d} 95%={n95:3d} 99%={n99:3d} | "
f"top=#{sorted_idx[0]} ({top_pct:.1f}%)")
print(line)
output_lines.append(line)
# Global summary
print("\n" + "=" * 60)
print(" PRUNING ANALYSIS")
print("=" * 60)
avg_90 = np.mean(all_n90)
avg_95 = np.mean(all_n95)
avg_99 = np.mean(all_n99)
max_99 = np.max(all_n99)
print(f"\nAverage experts for 90% signal: {avg_90:.0f}")
print(f"Average experts for 95% signal: {avg_95:.0f}")
print(f"Average experts for 99% signal: {avg_99:.0f}")
print(f"Max experts needed (99%, worst layer): {max_99}")
# Expert FFN size: [7168, 2048, 384] per layer for gate/up
# Each expert: gate[7168,2048] + up[7168,2048] + down[2048,7168] in TQ1_0
# TQ1_0: 256 elements = 54 bytes → 0.2109 bytes/element
bytes_per_element = 54.0 / 256 # TQ1_0
expert_ffn_dim = 2048
params_per_expert = (dim * expert_ffn_dim + dim * expert_ffn_dim +
expert_ffn_dim * dim) # gate + up + down
bytes_per_expert = params_per_expert * bytes_per_element
expert_total_gb = bytes_per_expert * n_experts * n_layers / 1e9
print(f"\nExpert params per layer: {params_per_expert * n_experts / 1e9:.1f}B")
print(f"Expert storage (all): ~{expert_total_gb:.0f} GB")
print(f"Per expert per layer: ~{bytes_per_expert / 1e6:.1f} MB")
# Size estimates
non_expert_gb = 226.0 - expert_total_gb # attention, norms, embeddings, shared experts
print(f"\nNon-expert params: ~{non_expert_gb:.0f} GB (attention, norms, embeddings, shared)")
print(f"\n{'='*50}")
print(f" MODEL SIZE ESTIMATES")
print(f"{'='*50}")
for n_keep in [32, 48, 64, 96, 128, 192]:
pruned_expert_gb = bytes_per_expert * n_keep * n_layers / 1e9
total_gb = non_expert_gb + pruned_expert_gb
pct = 100.0 * total_gb / 226.0
# Find signal coverage at this expert count
coverages = []
for li in range(n_layers):
counts = activation_counts[li]
sorted_counts = np.sort(counts)[::-1]
total = counts.sum()
if total > 0:
cov = np.sum(sorted_counts[:n_keep]) / total
coverages.append(cov)
avg_coverage = np.mean(coverages) * 100 if coverages else 0
marker = " ← MINI PC" if total_gb < 20 else (" ← SWEET SPOT" if total_gb < 50 else "")
print(f" {n_keep:3d} experts: ~{total_gb:5.1f} GB | "
f"{pct:4.1f}% of original | "
f"~{avg_coverage:.1f}% signal coverage{marker}")
# Global expert importance (sum across layers)
global_importance = activation_counts.sum(axis=0)
global_sorted = np.argsort(global_importance)[::-1]
print(f"\n{'='*50}")
print(f" TOP 20 GLOBAL EXPERTS")
print(f"{'='*50}")
for i in range(20):
eid = global_sorted[i]
count = global_importance[eid]
pct = 100.0 * count / global_importance.sum()
print(f" #{eid:3d}: {count:8d} activations ({pct:.2f}%)")
# Save full data
output_path = "expert_profile.csv"
with open(output_path, 'w') as f:
f.write("layer,expert_id,count,pct\n")
for li, layer in enumerate(layers_sorted):
total = activation_counts[li].sum()
for eid in range(n_experts):
if activation_counts[li][eid] > 0:
f.write(f"{layer},{eid},{activation_counts[li][eid]},"
f"{activation_counts[li][eid]/total:.6f}\n")
print(f"\nFull data → {output_path}")
# Save pruning recommendation
rec_path = "pruning_recommendation.txt"
with open(rec_path, 'w') as f:
f.write(f"# IX-PROFILER Pruning Recommendation\n")
f.write(f"# Generated from {n_simulations} simulated tokens\n")
f.write(f"# Morocco\n\n")
for line in output_lines:
f.write(line + "\n")
f.write(f"\nRecommendation: Keep top {int(avg_95)} experts per layer (95% signal)\n")
f.write(f"Estimated size: see analysis above\n")
f.write(f"\nEssential expert IDs (global top-64):\n")
for i in range(64):
f.write(f" {global_sorted[i]}\n")
print(f"Recommendation → {rec_path}")
if __name__ == '__main__':
model_dir = sys.argv[1] if len(sys.argv) > 1 else "./models/"
n_sim = int(sys.argv[2]) if len(sys.argv) > 2 else 10000
simulate_routing(model_dir, n_simulations=n_sim)