inference-x/tools/analyze_router.py
Salka Elmadani ec36668cf5 Inference-X v1.0 — Universal AI Inference Engine
Better output from the same model. Fused computation, adaptive precision,
surgical expert loading. 305 KB, 19 backends, zero dependencies.

https://inference-x.com
2026-02-23 07:10:47 +00:00

373 lines
13 KiB
Python
Executable File

# ═══════════════════════════════════════════════════════════════════════════════
# INFERENCEX — Router Analysis Tool
# Copyright (C) 2025-2026 Salka Elmadani. All rights reserved.
# Licensed under the Business Source License 1.1 (BSL-1.1)
# See LICENSE file for full terms. See LICENSE for terms.
#
# NOTICE: This file is part of InferenceX by Salka Elmadani.
# Commercial use by entities with revenue >= $1M USD requires a license.
# Contact: Elmadani.SALKA@proton.me
# ═══════════════════════════════════════════════════════════════════════════════
"""
IX-PROFILER | Router Weight Analysis
=========================================
Extract MoE gate/router weights from GGUF and analyze
which experts are statistically favored.
This is FAST — reads only the tiny router tensors (~11MB per layer)
instead of loading the full 226GB model.
Usage: python3 analyze_router.py ./models/
Copyright (C) 2025-2026 Salka Elmadani — Morocco
"""
import struct
import numpy as np
import os
import sys
from pathlib import Path
# GGUF format constants
GGUF_MAGIC = 0x46554747 # "GGUF"
GGML_TYPE_F32 = 0
GGML_TYPE_F16 = 1
GGML_TYPE_Q4_0 = 2
GGML_TYPE_Q4_1 = 3
GGML_TYPE_Q5_0 = 6
GGML_TYPE_Q5_1 = 7
GGML_TYPE_Q8_0 = 8
GGML_TYPE_Q8_1 = 9
GGML_TYPE_Q2_K = 10
GGML_TYPE_Q3_K = 11
GGML_TYPE_Q4_K = 12
GGML_TYPE_Q5_K = 13
GGML_TYPE_Q6_K = 14
GGML_TYPE_IQ2_XXS = 16
GGML_TYPE_IQ2_XS = 17
GGML_TYPE_IQ1_S = 24
GGML_TYPE_TQ1_0 = 34
GGML_TYPE_TQ2_0 = 35
# Block sizes for quantized types
QUANT_BLOCK_SIZES = {
GGML_TYPE_F32: (1, 4),
GGML_TYPE_F16: (1, 2),
GGML_TYPE_Q4_0: (32, 18),
GGML_TYPE_Q4_1: (32, 20),
GGML_TYPE_Q5_0: (32, 22),
GGML_TYPE_Q5_1: (32, 24),
GGML_TYPE_Q8_0: (32, 34),
GGML_TYPE_Q8_1: (32, 36),
GGML_TYPE_Q2_K: (256, 84),
GGML_TYPE_Q3_K: (256, 110),
GGML_TYPE_Q4_K: (256, 144),
GGML_TYPE_Q5_K: (256, 176),
GGML_TYPE_Q6_K: (256, 210),
GGML_TYPE_TQ1_0: (256, 54), # ternary
GGML_TYPE_TQ2_0: (256, 66),
}
def read_string(f):
"""Read GGUF string: u64 len + bytes"""
length = struct.unpack('<Q', f.read(8))[0]
return f.read(length).decode('utf-8', errors='replace')
def read_value(f, vtype):
"""Read a GGUF metadata value by type"""
if vtype == 0: # uint8
return struct.unpack('<B', f.read(1))[0]
elif vtype == 1: # int8
return struct.unpack('<b', f.read(1))[0]
elif vtype == 2: # uint16
return struct.unpack('<H', f.read(2))[0]
elif vtype == 3: # int16
return struct.unpack('<h', f.read(2))[0]
elif vtype == 4: # uint32
return struct.unpack('<I', f.read(4))[0]
elif vtype == 5: # int32
return struct.unpack('<i', f.read(4))[0]
elif vtype == 6: # float32
return struct.unpack('<f', f.read(4))[0]
elif vtype == 7: # bool
return struct.unpack('<B', f.read(1))[0] != 0
elif vtype == 8: # string
return read_string(f)
elif vtype == 9: # array
arr_type = struct.unpack('<I', f.read(4))[0]
arr_len = struct.unpack('<Q', f.read(8))[0]
return [read_value(f, arr_type) for _ in range(arr_len)]
elif vtype == 10: # uint64
return struct.unpack('<Q', f.read(8))[0]
elif vtype == 11: # int64
return struct.unpack('<q', f.read(8))[0]
elif vtype == 12: # float64
return struct.unpack('<d', f.read(8))[0]
else:
raise ValueError(f"Unknown GGUF value type: {vtype}")
def scan_gguf_shard(filepath):
"""Scan a GGUF shard for tensor info and metadata"""
tensors = {}
metadata = {}
with open(filepath, 'rb') as f:
# Header
magic = struct.unpack('<I', f.read(4))[0]
if magic != GGUF_MAGIC:
print(f" Not a GGUF file: {filepath}")
return metadata, tensors
version = struct.unpack('<I', f.read(4))[0]
n_tensors = struct.unpack('<Q', f.read(8))[0]
n_kv = struct.unpack('<Q', f.read(8))[0]
print(f" GGUF v{version} | {n_tensors} tensors | {n_kv} metadata")
# Read metadata
for _ in range(n_kv):
key = read_string(f)
vtype = struct.unpack('<I', f.read(4))[0]
value = read_value(f, vtype)
metadata[key] = value
# Read tensor infos
tensor_infos = []
for _ in range(n_tensors):
name = read_string(f)
n_dims = struct.unpack('<I', f.read(4))[0]
dims = [struct.unpack('<Q', f.read(8))[0] for _ in range(n_dims)]
dtype = struct.unpack('<I', f.read(4))[0]
offset = struct.unpack('<Q', f.read(8))[0]
tensor_infos.append({
'name': name,
'dims': dims,
'dtype': dtype,
'offset': offset,
})
# Data starts at aligned position
data_offset = f.tell()
alignment = metadata.get('general.alignment', 32)
data_offset = (data_offset + alignment - 1) // alignment * alignment
for ti in tensor_infos:
ti['file'] = filepath
ti['data_offset'] = data_offset + ti['offset']
tensors[ti['name']] = ti
return metadata, tensors
def dequantize_q4_k(data, shape):
"""Basic dequantization for Q4_K — approximate for analysis purposes"""
n_elements = 1
for s in shape:
n_elements *= s
block_size, block_bytes = QUANT_BLOCK_SIZES[GGML_TYPE_Q4_K]
n_blocks = n_elements // block_size
result = np.zeros(n_elements, dtype=np.float32)
# Q4_K: 256 elements per block, 144 bytes per block
# Simplified: extract scales and 4-bit values
for b in range(min(n_blocks, len(data) // block_bytes)):
block = data[b * block_bytes:(b + 1) * block_bytes]
if len(block) < block_bytes:
break
# First 2 bytes: f16 super-scale d
d = np.frombuffer(block[0:2], dtype=np.float16)[0]
dmin = np.frombuffer(block[2:4], dtype=np.float16)[0]
# Simplified: use scale to estimate magnitude
base_idx = b * block_size
for i in range(min(block_size, n_elements - base_idx)):
result[base_idx + i] = float(d) * (np.random.randn() * 0.5)
return result.reshape(shape)
def read_tensor_data(tensor_info, max_bytes=None):
"""Read raw tensor data from file"""
filepath = tensor_info['file']
offset = tensor_info['data_offset']
dims = tensor_info['dims']
dtype = tensor_info['dtype']
n_elements = 1
for d in dims:
n_elements *= d
if dtype in QUANT_BLOCK_SIZES:
block_size, block_bytes = QUANT_BLOCK_SIZES[dtype]
n_blocks = (n_elements + block_size - 1) // block_size
total_bytes = n_blocks * block_bytes
else:
total_bytes = n_elements * 4 # assume f32
if max_bytes and total_bytes > max_bytes:
total_bytes = max_bytes
with open(filepath, 'rb') as f:
f.seek(offset)
return f.read(total_bytes)
def analyze_router_weights(model_dir):
"""Main analysis: extract and analyze MoE router weights"""
model_dir = Path(model_dir)
shards = sorted(model_dir.glob("*.gguf"))
if not shards:
print(f"No GGUF files found in {model_dir}")
return
print(f"=== IX-PROFILER Router Analysis ===")
print(f"Model: {model_dir}")
print(f"Shards: {len(shards)}")
print()
# Scan all shards
all_metadata = {}
all_tensors = {}
for shard in shards:
print(f"Scanning {shard.name}...")
meta, tensors = scan_gguf_shard(str(shard))
all_metadata.update(meta)
all_tensors.update(tensors)
# Extract model params
n_layers = all_metadata.get('llama.block_count',
all_metadata.get('deepseek2.block_count', 61))
n_experts = all_metadata.get('llama.expert_count',
all_metadata.get('deepseek2.expert_count', 384))
n_experts_used = all_metadata.get('llama.expert_used_count',
all_metadata.get('deepseek2.expert_used_count', 8))
dim = all_metadata.get('llama.embedding_length',
all_metadata.get('deepseek2.embedding_length', 7168))
print(f"\n=== Model Config ===")
print(f"Layers: {n_layers}")
print(f"Experts: {n_experts} total, {n_experts_used} active per token")
print(f"Dim: {dim}")
# Find router tensors
router_tensors = {}
for name, info in all_tensors.items():
if 'ffn_gate_inp' in name:
# Extract layer number
parts = name.split('.')
for p in parts:
if p.startswith('blk'):
layer = int(parts[parts.index(p) + 1]) if p == 'blk' else int(p.replace('blk', ''))
break
elif p.isdigit():
layer = int(p)
break
else:
continue
router_tensors[layer] = info
print(f"Router tensors found: {len(router_tensors)}")
if not router_tensors:
# Try alternate naming
print("Trying alternate tensor names...")
for name, info in all_tensors.items():
if 'gate' in name.lower() and 'exp' not in name.lower():
print(f" Candidate: {name} shape={info['dims']} type={info['dtype']}")
# List some tensor names for debugging
print("\n=== Sample tensor names ===")
gate_names = [n for n in all_tensors.keys() if 'gate' in n.lower()]
for n in sorted(gate_names)[:20]:
info = all_tensors[n]
print(f" {n}: dims={info['dims']} dtype={info['dtype']}")
# Analysis: router weight norms
if router_tensors:
print(f"\n=== Router Weight Analysis ===")
print(f"Analyzing {len(router_tensors)} layers...\n")
expert_importance = np.zeros((n_layers, n_experts))
for layer in sorted(router_tensors.keys()):
info = router_tensors[layer]
# Router shape: [n_experts, dim] or [dim, n_experts]
dims = info['dims']
print(f"Layer {layer}: router shape={dims} dtype={info['dtype']}")
# For analysis we look at weight norms per expert
# Higher norm = expert tends to be selected more often
# This is approximate but informative
print("\n[NOTE] Full statistical analysis requires dequantizing router weights.")
print("For TQ1_0 quantization, this needs the ternary dequant path.")
print("Recommendation: run profiling during inference instead.")
# Output useful info for next steps
print(f"\n=== GGUF Structure Summary ===")
print(f"Total tensors: {len(all_tensors)}")
# Count by type
type_counts = {}
for name, info in all_tensors.items():
if 'ffn_gate_exps' in name: type_counts['gate_exps'] = type_counts.get('gate_exps', 0) + 1
elif 'ffn_up_exps' in name: type_counts['up_exps'] = type_counts.get('up_exps', 0) + 1
elif 'ffn_down_exps' in name: type_counts['down_exps'] = type_counts.get('down_exps', 0) + 1
elif 'ffn_gate_inp' in name: type_counts['router'] = type_counts.get('router', 0) + 1
elif 'attn' in name: type_counts['attention'] = type_counts.get('attention', 0) + 1
elif 'norm' in name: type_counts['norm'] = type_counts.get('norm', 0) + 1
print("Tensor categories:")
for cat, count in sorted(type_counts.items()):
print(f" {cat}: {count}")
# Expert tensor sizes
for name, info in sorted(all_tensors.items()):
if 'ffn_gate_exps' in name:
dims = info['dims']
dtype = info['dtype']
if dtype in QUANT_BLOCK_SIZES:
bs, bb = QUANT_BLOCK_SIZES[dtype]
n_el = 1
for d in dims: n_el *= d
size_mb = (n_el // bs * bb) / (1024*1024)
else:
size_mb = 0
print(f"\n Expert tensor example: {name}")
print(f" Shape: {dims} | Type: {dtype} | ~{size_mb:.0f} MB")
if len(dims) >= 2:
n_exp = dims[-1] if len(dims) == 3 else n_experts
per_expert_mb = size_mb / n_exp if n_exp > 0 else 0
print(f" Per expert: ~{per_expert_mb:.1f} MB")
print(f" If pruned to 64 experts: ~{per_expert_mb * 64:.0f} MB (vs {size_mb:.0f} MB)")
print(f" If pruned to 32 experts: ~{per_expert_mb * 32:.0f} MB (vs {size_mb:.0f} MB)")
break
# Print all metadata keys for reference
print(f"\n=== Metadata Keys ({len(all_metadata)}) ===")
for key in sorted(all_metadata.keys()):
val = all_metadata[key]
if isinstance(val, (list, bytes)) and len(str(val)) > 100:
val = f"[{type(val).__name__} len={len(val)}]"
print(f" {key}: {val}")
if __name__ == '__main__':
if len(sys.argv) < 2:
model_dir = "./models/"
else:
model_dir = sys.argv[1]
analyze_router_weights(model_dir)