organ-architecture/kimi_z_stream.py

425 lines
15 KiB
Python

#!/usr/bin/env python3
"""
kimi_z_stream.py — Stream Z-measure for Kimi K2.5 1T
Downloads each shard, measures Z for every tensor, deletes shard.
Final output: z_report_kimi_k25.json (few KB)
"""
import struct, os, sys, json, time, math, shutil
import numpy as np
# Config
REPO = "unsloth/Kimi-K2.5-GGUF"
QUANT = "Q4_0"
N_SHARDS = 13
SHARD_DIR = "/mnt/data/kimi-k25/streaming"
OUTPUT = "/mnt/data/organ-architecture/z_report_kimi_k25.json"
LOG = "/tmp/kimi_z_stream.log"
os.makedirs(SHARD_DIR, exist_ok=True)
os.environ['HF_HUB_ENABLE_HF_TRANSFER'] = '1'
def log(msg):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
print(line, flush=True)
with open(LOG, 'a') as f:
f.write(line + "\n")
# GGUF type info for dequantization
GGML_TYPES = {0:'F32',1:'F16',2:'Q4_0',3:'Q4_1',6:'Q5_0',7:'Q5_1',8:'Q8_0',
10:'Q2_K',11:'Q3_K',12:'Q4_K',13:'Q5_K',14:'Q6_K',15:'Q8_K',
16:'IQ2_XXS',17:'IQ2_XS',18:'IQ3_XXS',19:'IQ1_S',20:'IQ4_NL',
26:'Q4_0_4_4',27:'Q4_0_4_8',28:'Q4_0_8_8',29:'TQ1_0',30:'TQ2_0'}
# Block sizes for each quant type
BLOCK_SIZES = {
0: (1, 4), # F32: 1 element per block, 4 bytes
1: (1, 2), # F16: 1 element per block, 2 bytes
2: (32, 18), # Q4_0: 32 elements per block, 18 bytes (2 byte scale + 16 byte quants)
3: (32, 20), # Q4_1: 32 elements, 20 bytes
8: (32, 34), # Q8_0: 32 elements, 34 bytes (2 byte scale + 32 byte quants)
12: (256, 144),# Q4_K: 256 elements, 144 bytes
13: (256, 176),# Q5_K: 256 elements, 176 bytes
14: (256, 210),# Q6_K: 256 elements, 210 bytes
}
def dequant_q4_0(data, n_elements):
"""Dequantize Q4_0 block format to float32"""
block_size = 32
n_blocks = n_elements // block_size
result = np.zeros(n_elements, dtype=np.float32)
offset = 0
for i in range(n_blocks):
# 2 bytes: float16 scale
scale = np.frombuffer(data[offset:offset+2], dtype=np.float16)[0].astype(np.float32)
offset += 2
# 16 bytes: 32 x 4-bit quants
quants = np.frombuffer(data[offset:offset+16], dtype=np.uint8)
offset += 16
for j in range(16):
q_lo = (quants[j] & 0x0F) - 8
q_hi = (quants[j] >> 4) - 8
result[i * block_size + j * 2] = scale * q_lo
result[i * block_size + j * 2 + 1] = scale * q_hi
return result
def fast_z_measure(data, dtype, n_elements):
"""
Compute Z-angle (theta) for a tensor.
Uses statistical properties of the raw quantized data.
theta = arccos(correlation_with_unit_reference)
For pure signal: theta -> 90 degrees
"""
try:
if dtype == 0: # F32
vals = np.frombuffer(data[:n_elements*4], dtype=np.float32)
elif dtype == 1: # F16
vals = np.frombuffer(data[:n_elements*2], dtype=np.float16).astype(np.float32)
elif dtype == 8: # Q8_0
# Extract scales for quick measurement
block_size = 32
n_blocks = n_elements // block_size
scales = np.zeros(n_blocks, dtype=np.float32)
offset = 0
for b in range(min(n_blocks, 10000)): # Sample up to 10K blocks
scales[b] = np.frombuffer(data[offset:offset+2], dtype=np.float16)[0]
offset += 34
vals = scales[:min(n_blocks, 10000)]
elif dtype == 2: # Q4_0
# Extract scales for quick measurement
block_size = 32
n_blocks = n_elements // block_size
n_sample = min(n_blocks, 50000)
scales = np.zeros(n_sample, dtype=np.float32)
offset = 0
for b in range(n_sample):
scales[b] = np.frombuffer(data[offset:offset+2], dtype=np.float16)[0]
offset += 18
vals = scales
elif dtype in (12, 13, 14): # Q4_K, Q5_K, Q6_K
# Extract super-block scales
if dtype == 12:
block_bytes = 144
elif dtype == 13:
block_bytes = 176
else:
block_bytes = 210
n_blocks = n_elements // 256
n_sample = min(n_blocks, 50000)
scales = np.zeros(n_sample, dtype=np.float32)
offset = 0
for b in range(n_sample):
scales[b] = np.frombuffer(data[offset:offset+2], dtype=np.float16)[0]
offset += block_bytes
vals = scales
else:
return None, f"unsupported_dtype_{dtype}"
if len(vals) < 10:
return None, "too_few_values"
# Remove zeros and infinities
vals = vals[np.isfinite(vals)]
if len(vals) < 10:
return None, "too_few_finite"
# theta = arccos(|correlation with linear reference|)
# Pure signal -> decorrelated -> theta near 90
# Noise/bias -> correlated with something simple -> theta near 0
n = len(vals)
ref = np.linspace(-1, 1, n)
# Normalize
vals_norm = vals - np.mean(vals)
ref_norm = ref - np.mean(ref)
std_v = np.std(vals_norm)
std_r = np.std(ref_norm)
if std_v < 1e-10 or std_r < 1e-10:
return 0.0, "constant"
corr = np.dot(vals_norm, ref_norm) / (n * std_v * std_r)
corr = max(-1.0, min(1.0, corr))
theta = math.degrees(math.acos(abs(corr)))
return theta, "ok"
except Exception as e:
return None, str(e)
def read_string(f):
n = struct.unpack('<Q', f.read(8))[0]
return f.read(n).decode('utf-8', errors='replace')
def read_kv_value(f, vtype):
if vtype == 4: return struct.unpack('<I', f.read(4))[0]
elif vtype == 5: return struct.unpack('<i', f.read(4))[0]
elif vtype == 8: return read_string(f)
elif vtype == 6: return struct.unpack('<f', f.read(4))[0]
elif vtype == 10: return struct.unpack('<Q', f.read(8))[0]
elif vtype == 7: return struct.unpack('<B', f.read(1))[0]
elif vtype == 0: return struct.unpack('<B', f.read(1))[0]
elif vtype == 9:
atype = struct.unpack('<I', f.read(4))[0]
alen = struct.unpack('<Q', f.read(8))[0]
return [read_kv_value(f, atype) for _ in range(alen)]
elif vtype == 1: return struct.unpack('<b', f.read(1))[0]
elif vtype == 2: return struct.unpack('<H', f.read(2))[0]
elif vtype == 3: return struct.unpack('<h', f.read(2))[0]
elif vtype == 12: return struct.unpack('<d', f.read(8))[0]
else:
return None
def process_shard(shard_path, shard_idx):
"""Parse GGUF shard, Z-measure each tensor, return results"""
results = []
f = open(shard_path, 'rb')
magic = f.read(4)
if magic != b'GGUF':
log(f" ERROR: Not GGUF (magic={magic})")
return results
version = struct.unpack('<I', f.read(4))[0]
n_tensors = struct.unpack('<Q', f.read(8))[0]
n_kv = struct.unpack('<Q', f.read(8))[0]
log(f" Shard {shard_idx}: {n_tensors} tensors, {n_kv} KV pairs")
# Skip KV pairs
for _ in range(n_kv):
read_string(f)
vtype = struct.unpack('<I', f.read(4))[0]
read_kv_value(f, vtype)
# Read tensor infos
tensor_infos = []
for _ in range(n_tensors):
name = read_string(f)
n_dims = struct.unpack('<I', f.read(4))[0]
dims = [struct.unpack('<Q', f.read(8))[0] for _ in range(n_dims)]
dtype = struct.unpack('<I', f.read(4))[0]
offset = struct.unpack('<Q', f.read(8))[0]
n_elements = 1
for d in dims:
n_elements *= d
tensor_infos.append({
'name': name, 'dims': dims, 'dtype': dtype,
'offset': offset, 'n_elements': n_elements
})
# Data starts at alignment boundary
pos = f.tell()
alignment = 32
data_start = ((pos + alignment - 1) // alignment) * alignment
# Process each tensor
for i, ti in enumerate(tensor_infos):
# Calculate data size
dtype = ti['dtype']
n_elem = ti['n_elements']
if dtype in BLOCK_SIZES:
elems_per_block, bytes_per_block = BLOCK_SIZES[dtype]
n_blocks = (n_elem + elems_per_block - 1) // elems_per_block
data_size = n_blocks * bytes_per_block
else:
# Unknown type, skip
results.append({
'name': ti['name'],
'dims': ti['dims'],
'dtype': GGML_TYPES.get(dtype, f'unk_{dtype}'),
'n_elements': n_elem,
'theta': None,
'status': f'unknown_block_size_dtype_{dtype}'
})
continue
# Read tensor data
f.seek(data_start + ti['offset'])
data = f.read(data_size)
if len(data) < data_size:
results.append({
'name': ti['name'],
'dims': ti['dims'],
'dtype': GGML_TYPES.get(dtype, f'unk_{dtype}'),
'n_elements': n_elem,
'theta': None,
'status': 'truncated'
})
continue
# compute measure
theta, status = fast_z_measure(data, dtype, n_elem)
results.append({
'name': ti['name'],
'dims': ti['dims'],
'dtype': GGML_TYPES.get(dtype, f'unk_{dtype}'),
'n_elements': n_elem,
'theta': round(theta, 2) if theta is not None else None,
'status': status
})
if (i + 1) % 20 == 0:
log(f" Measured {i+1}/{n_tensors} tensors")
f.close()
return results
def main():
from huggingface_hub import hf_hub_download
log("=" * 60)
log("KIMI K2.5 1T — STREAMING Z-MEASURE")
log(f"Repo: {REPO}, Quant: {QUANT}, Shards: {N_SHARDS}")
log("=" * 60)
all_results = []
total_start = time.time()
# Check if we already have partial results
if os.path.exists(OUTPUT):
with open(OUTPUT) as f:
existing = json.load(f)
if 'shards_completed' in existing:
start_shard = existing['shards_completed']
all_results = existing.get('tensors', [])
log(f"Resuming from shard {start_shard + 1}")
else:
start_shard = 0
else:
start_shard = 0
for shard_idx in range(start_shard, N_SHARDS):
shard_num = shard_idx + 1
filename = f"{QUANT}/Kimi-K2.5-{QUANT}-{shard_num:05d}-of-{N_SHARDS:05d}.gguf"
shard_path = os.path.join(SHARD_DIR, QUANT, os.path.basename(filename))
# Download
log(f"\n--- SHARD {shard_num}/{N_SHARDS} ---")
log(f"Downloading {filename}...")
dl_start = time.time()
try:
path = hf_hub_download(
repo_id=REPO,
filename=filename,
local_dir=SHARD_DIR
)
dl_time = time.time() - dl_start
size_gb = os.path.getsize(path) / (1024**3)
log(f"Downloaded: {size_gb:.1f}GB in {dl_time:.0f}s ({size_gb*1024/dl_time:.0f} MB/s)")
except Exception as e:
log(f"DOWNLOAD ERROR: {e}")
continue
# compute measure
log(f"Z-measuring tensors...")
measure_start = time.time()
shard_results = process_shard(path, shard_idx)
measure_time = time.time() - measure_start
log(f"Measured {len(shard_results)} tensors in {measure_time:.1f}s")
all_results.extend(shard_results)
# Save intermediate results
report = {
'model': 'Kimi-K2.5-1T',
'quant': QUANT,
'total_shards': N_SHARDS,
'shards_completed': shard_idx + 1,
'total_tensors': len(all_results),
'tensors': all_results
}
with open(OUTPUT, 'w') as f:
json.dump(report, f, indent=2)
log(f"Saved {len(all_results)} tensor measurements to {OUTPUT}")
# Delete shard
os.remove(path)
log(f"Deleted shard {shard_num}")
# Clean any HF cache
cache_dir = os.path.join(SHARD_DIR, '.cache')
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Check disk
import subprocess
r = subprocess.run(['df', '-h', '/mnt/data'], capture_output=True, text=True)
for line in r.stdout.strip().split('\n')[1:]:
log(f"Disk: {line.strip()}")
# Final summary
total_time = time.time() - total_start
# Compute aggregates
thetas = [r['theta'] for r in all_results if r['theta'] is not None]
# Group by type
groups = {}
for r in all_results:
name = r['name']
if 'attn' in name:
g = 'attention'
elif 'ffn' in name and 'exp' in name:
g = 'moe_experts'
elif 'ffn' in name and 'shexp' in name:
g = 'shared_expert'
elif 'ffn' in name:
g = 'ffn'
elif 'embed' in name or 'embd' in name:
g = 'embed'
elif 'norm' in name:
g = 'norm'
elif 'output' in name and 'attn' not in name:
g = 'output'
else:
g = 'other'
if g not in groups:
groups[g] = []
if r['theta'] is not None:
groups[g].append(r['theta'])
log(f"\n{'='*60}")
log(f"FINAL Z-REPORT — Kimi K2.5 1T")
log(f"{'='*60}")
log(f"Total tensors: {len(all_results)}")
log(f"Total time: {total_time/3600:.1f}h")
log(f"Overall θ: {np.mean(thetas):.1f}° (std={np.std(thetas):.1f}°)")
log(f"\nBy group:")
for g, vals in sorted(groups.items()):
log(f" {g}: θ={np.mean(vals):.1f}° (n={len(vals)}, std={np.std(vals):.1f}°)")
# Save final report with aggregates
report['summary'] = {
'total_time_hours': round(total_time/3600, 2),
'overall_theta': round(float(np.mean(thetas)), 2),
'overall_std': round(float(np.std(thetas)), 2),
'groups': {g: {'theta': round(float(np.mean(v)), 2), 'std': round(float(np.std(v)), 2), 'count': len(v)}
for g, v in groups.items()}
}
with open(OUTPUT, 'w') as f:
json.dump(report, f, indent=2)
log(f"\nFinal report saved to {OUTPUT}")
log("COMPLETE")
if __name__ == '__main__':
main()
# ╔══ SALKA ELMADANI AUTHORSHIP CERTIFICATE ══╗
# © Salka Elmadani 2025-2026 — ALL RIGHTS RESERVED
# Licensed under Business Source License 1.1 — https://inference-x.com
# ─────────────────────────────────────────────────────────
# SHA256: cc9658edb88d02924491a2ed20562a282a005413ef963bd0c82613abcfe91693
# SIG-ED25519: AN2P6qd2YhyS6+YRnMu3mmnE9KZbpBlFAxiVzENVXSSbIl2+PL/rbW8pMPrcOS8BwPg88Os7dMOuYnRvL5t4CQ==
# VERIFY: python3 verify_authorship.py kimi_z_stream.py