444 lines
15 KiB
Python
444 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Extract skeleton (attention) + organs (FFN) from GGUF models.
|
|
|
|
The scalpel that opens monoliths.
|
|
|
|
"""
|
|
|
|
import struct
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# ═══ GGUF FORMAT CONSTANTS ═══
|
|
GGUF_MAGIC = 0x46554747 # "GGUF"
|
|
|
|
GGUF_TYPE_MAP = {
|
|
0: ('uint8', 1), 1: ('int8', 1), 2: ('uint16', 2), 3: ('int16', 2),
|
|
4: ('uint32', 4), 5: ('int32', 4), 6: ('float32', 4), 7: ('bool', 1),
|
|
8: ('string', -1), 9: ('array', -1), 10: ('uint64', 8), 11: ('int64', 8),
|
|
12: ('float64', 8),
|
|
}
|
|
|
|
GGML_TYPE_SIZE = {
|
|
0: 4, # F32
|
|
1: 2, # F16
|
|
2: 0.5625, # Q4_0 (18 bytes per 32 elements)
|
|
3: 0.625, # Q4_1 (20 bytes per 32 elements)
|
|
6: 0.6875, # Q5_0
|
|
7: 0.75, # Q5_1
|
|
8: 1.0625, # Q8_0
|
|
9: 1.125, # Q8_1
|
|
10: 0.5625, # Q2_K
|
|
11: 0.6875, # Q3_K
|
|
12: 0.5625, # Q4_K (same as Q4_0)
|
|
13: 0.6875, # Q5_K
|
|
14: 1.0625, # Q6_K
|
|
15: 0, # Q8_K
|
|
16: 0, # IQ2_XXS
|
|
17: 0, # IQ2_XS
|
|
18: 0, # IQ3_XXS
|
|
19: 0, # IQ1_S
|
|
20: 0, # IQ4_NL
|
|
21: 0, # IQ3_S
|
|
22: 0, # IQ2_S
|
|
23: 0, # IQ4_XS
|
|
28: 0.5, # BF16
|
|
29: 0.5625, # Q4_0_4_4
|
|
30: 0.5625, # Q4_0_4_8
|
|
31: 0.5625, # Q4_0_8_8
|
|
}
|
|
|
|
|
|
class GGUFReader:
|
|
"""Read GGUF file structure without loading full tensors into memory."""
|
|
|
|
def __init__(self, path):
|
|
self.path = path
|
|
self.f = open(path, 'rb')
|
|
self.metadata = {}
|
|
self.tensors = []
|
|
self._read_header()
|
|
|
|
def _read_u32(self):
|
|
return struct.unpack('<I', self.f.read(4))[0]
|
|
|
|
def _read_u64(self):
|
|
return struct.unpack('<Q', self.f.read(8))[0]
|
|
|
|
def _read_i32(self):
|
|
return struct.unpack('<i', self.f.read(4))[0]
|
|
|
|
def _read_f32(self):
|
|
return struct.unpack('<f', self.f.read(4))[0]
|
|
|
|
def _read_string(self):
|
|
length = self._read_u64()
|
|
return self.f.read(length).decode('utf-8', errors='replace')
|
|
|
|
def _read_value(self, vtype):
|
|
if vtype == 0: return struct.unpack('<B', self.f.read(1))[0]
|
|
elif vtype == 1: return struct.unpack('<b', self.f.read(1))[0]
|
|
elif vtype == 2: return struct.unpack('<H', self.f.read(2))[0]
|
|
elif vtype == 3: return struct.unpack('<h', self.f.read(2))[0]
|
|
elif vtype == 4: return self._read_u32()
|
|
elif vtype == 5: return self._read_i32()
|
|
elif vtype == 6: return self._read_f32()
|
|
elif vtype == 7: return bool(struct.unpack('<B', self.f.read(1))[0])
|
|
elif vtype == 8: return self._read_string()
|
|
elif vtype == 9:
|
|
arr_type = self._read_u32()
|
|
arr_len = self._read_u64()
|
|
return [self._read_value(arr_type) for _ in range(arr_len)]
|
|
elif vtype == 10: return self._read_u64()
|
|
elif vtype == 11: return struct.unpack('<q', self.f.read(8))[0]
|
|
elif vtype == 12: return struct.unpack('<d', self.f.read(8))[0]
|
|
return None
|
|
|
|
def _read_header(self):
|
|
magic = self._read_u32()
|
|
if magic != GGUF_MAGIC:
|
|
raise ValueError(f"Not a GGUF file: magic={hex(magic)}")
|
|
|
|
version = self._read_u32()
|
|
n_tensors = self._read_u64()
|
|
n_metadata = self._read_u64()
|
|
|
|
self.metadata['_gguf_version'] = version
|
|
self.metadata['_n_tensors'] = n_tensors
|
|
self.metadata['_n_metadata'] = n_metadata
|
|
|
|
# Read metadata key-value pairs
|
|
for _ in range(n_metadata):
|
|
key = self._read_string()
|
|
vtype = self._read_u32()
|
|
value = self._read_value(vtype)
|
|
self.metadata[key] = value
|
|
|
|
# Read tensor info
|
|
for _ in range(n_tensors):
|
|
name = self._read_string()
|
|
n_dims = self._read_u32()
|
|
dims = [self._read_u64() for _ in range(n_dims)]
|
|
dtype = self._read_u32()
|
|
offset = self._read_u64()
|
|
|
|
# Calculate size
|
|
n_elements = 1
|
|
for d in dims:
|
|
n_elements *= d
|
|
type_size = GGML_TYPE_SIZE.get(dtype, 0)
|
|
byte_size = int(n_elements * type_size) if type_size > 0 else 0
|
|
|
|
self.tensors.append({
|
|
'name': name,
|
|
'dims': dims,
|
|
'dtype': dtype,
|
|
'offset': offset,
|
|
'n_elements': n_elements,
|
|
'byte_size': byte_size,
|
|
})
|
|
|
|
# Record data start position (aligned to 32 bytes)
|
|
pos = self.f.tell()
|
|
self.data_offset = pos + (32 - pos % 32) % 32
|
|
|
|
def read_tensor_data(self, tensor):
|
|
"""Read raw tensor data from file."""
|
|
self.f.seek(self.data_offset + tensor['offset'])
|
|
return self.f.read(tensor['byte_size'])
|
|
|
|
def close(self):
|
|
self.f.close()
|
|
|
|
|
|
# ═══ ORGAN CLASSIFICATION ═══
|
|
|
|
def classify_tensor(name):
|
|
"""
|
|
Classify a tensor into organ type.
|
|
|
|
Skeleton = attention (thought structure)
|
|
Organ = FFN (knowledge/memory)
|
|
Adapter = LoRA weights (personality)
|
|
Embed = embedding/output layers (shared foundation)
|
|
Norm = normalization layers (connective tissue)
|
|
"""
|
|
name_lower = name.lower()
|
|
|
|
# Embedding layers — foundation
|
|
if any(k in name_lower for k in ['token_embd', 'embed_tokens', 'wte', 'word_embeddings']):
|
|
return 'embed'
|
|
|
|
# Output layers — foundation
|
|
if any(k in name_lower for k in ['output.weight', 'lm_head', 'output_norm']):
|
|
return 'embed'
|
|
|
|
# Attention layers — skeleton (thought)
|
|
if any(k in name_lower for k in ['attn', 'self_attn', 'attention', '.q_proj', '.k_proj', '.v_proj', '.o_proj',
|
|
'attn_q', 'attn_k', 'attn_v', 'attn_output',
|
|
'query_key_value', 'c_attn', 'c_proj']):
|
|
return 'skeleton'
|
|
|
|
# FFN layers — organs (knowledge)
|
|
if any(k in name_lower for k in ['ffn_', 'feed_forward', 'mlp', 'gate_proj', 'up_proj', 'down_proj',
|
|
'fc1', 'fc2', 'c_fc', 'w1', 'w2', 'w3',
|
|
'ffn_gate', 'ffn_up', 'ffn_down', 'intermediate']):
|
|
return 'organ'
|
|
|
|
# MoE expert layers — specialized organs
|
|
if any(k in name_lower for k in ['expert', 'moe', 'gate.weight']):
|
|
return 'organ_expert'
|
|
|
|
# Normalization — connective tissue
|
|
if any(k in name_lower for k in ['norm', 'ln_', 'layer_norm', 'rms_norm', 'input_layernorm', 'post_attention']):
|
|
return 'norm'
|
|
|
|
# LoRA — adapter/personality
|
|
if any(k in name_lower for k in ['lora_', 'adapter']):
|
|
return 'adapter'
|
|
|
|
return 'unknown'
|
|
|
|
|
|
def get_layer_number(name):
|
|
"""Extract layer number from tensor name."""
|
|
import re
|
|
match = re.search(r'(?:layers?|blk|block|h)[\._](\d+)', name, re.IGNORECASE)
|
|
if match:
|
|
return int(match.group(1))
|
|
return -1
|
|
|
|
|
|
# ═══ EXTRACTION ═══
|
|
|
|
def extract_organs(model_path, output_dir, verbose=False):
|
|
"""
|
|
Extract a GGUF model into its constituent organs.
|
|
|
|
Output structure:
|
|
output_dir/
|
|
manifest.json — Complete map of the model's anatomy
|
|
skeleton/ — Attention tensors (thought)
|
|
organs/ — FFN tensors by layer (knowledge)
|
|
embed/ — Embedding + output (foundation)
|
|
norm/ — Normalization (connective tissue)
|
|
adapters/ — LoRA if present (personality)
|
|
"""
|
|
print(f"[ORGAN] Opening {model_path}")
|
|
reader = GGUFReader(model_path)
|
|
|
|
model_name = os.path.basename(model_path).replace('.gguf', '')
|
|
arch = reader.metadata.get('general.architecture', 'unknown')
|
|
n_layers = reader.metadata.get(f'{arch}.block_count', 0)
|
|
n_heads = reader.metadata.get(f'{arch}.attention.head_count', 0)
|
|
n_embed = reader.metadata.get(f'{arch}.embedding_length', 0)
|
|
vocab_size = reader.metadata.get(f'{arch}.vocab_size',
|
|
reader.metadata.get('tokenizer.ggml.tokens', []))
|
|
if isinstance(vocab_size, list):
|
|
vocab_size = len(vocab_size)
|
|
|
|
print(f"[ORGAN] Architecture: {arch}")
|
|
print(f"[ORGAN] Layers: {n_layers}, Heads: {n_heads}, Embed: {n_embed}, Vocab: {vocab_size}")
|
|
print(f"[ORGAN] Tensors: {len(reader.tensors)}")
|
|
|
|
# Create output directories
|
|
out = Path(output_dir)
|
|
for d in ['skeleton', 'organs', 'embed', 'norm', 'adapters', 'unknown']:
|
|
(out / d).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Classify and extract
|
|
manifest = {
|
|
'model': model_name,
|
|
'architecture': arch,
|
|
'n_layers': n_layers,
|
|
'n_heads': n_heads,
|
|
'n_embed': n_embed,
|
|
'vocab_size': vocab_size,
|
|
'metadata': {k: v for k, v in reader.metadata.items()
|
|
if isinstance(v, (str, int, float, bool))},
|
|
'organs': {},
|
|
'stats': {
|
|
'skeleton_bytes': 0,
|
|
'organ_bytes': 0,
|
|
'embed_bytes': 0,
|
|
'norm_bytes': 0,
|
|
'adapter_bytes': 0,
|
|
'unknown_bytes': 0,
|
|
'total_bytes': 0,
|
|
'skeleton_count': 0,
|
|
'organ_count': 0,
|
|
},
|
|
}
|
|
|
|
# Process each tensor
|
|
for i, tensor in enumerate(reader.tensors):
|
|
organ_type = classify_tensor(tensor['name'])
|
|
layer_num = get_layer_number(tensor['name'])
|
|
|
|
# Determine output path
|
|
if organ_type == 'skeleton':
|
|
subdir = 'skeleton'
|
|
elif organ_type in ('organ', 'organ_expert'):
|
|
subdir = 'organs'
|
|
elif organ_type == 'embed':
|
|
subdir = 'embed'
|
|
elif organ_type == 'norm':
|
|
subdir = 'norm'
|
|
elif organ_type == 'adapter':
|
|
subdir = 'adapters'
|
|
else:
|
|
subdir = 'unknown'
|
|
|
|
# Safe filename
|
|
safe_name = tensor['name'].replace('/', '_').replace('.', '_')
|
|
filename = f"{safe_name}.bin"
|
|
filepath = out / subdir / filename
|
|
|
|
# Read and write tensor data
|
|
data = reader.read_tensor_data(tensor)
|
|
if data:
|
|
with open(filepath, 'wb') as f:
|
|
# Header: name_len(u32) + name + dims_count(u32) + dims(u64[]) + dtype(u32)
|
|
name_bytes = tensor['name'].encode('utf-8')
|
|
f.write(struct.pack('<I', len(name_bytes)))
|
|
f.write(name_bytes)
|
|
f.write(struct.pack('<I', len(tensor['dims'])))
|
|
for d in tensor['dims']:
|
|
f.write(struct.pack('<Q', d))
|
|
f.write(struct.pack('<I', tensor['dtype']))
|
|
f.write(data)
|
|
|
|
# Update manifest
|
|
entry = {
|
|
'name': tensor['name'],
|
|
'type': organ_type,
|
|
'layer': layer_num,
|
|
'dims': tensor['dims'],
|
|
'dtype': tensor['dtype'],
|
|
'n_elements': tensor['n_elements'],
|
|
'byte_size': tensor['byte_size'],
|
|
'file': f"{subdir}/{filename}",
|
|
'hash': hashlib.sha256(data).hexdigest()[:16] if data else None,
|
|
}
|
|
|
|
key = f"{subdir}/{safe_name}"
|
|
manifest['organs'][key] = entry
|
|
|
|
# Stats
|
|
stat_key = f"{organ_type.split('_')[0]}_bytes"
|
|
if stat_key in manifest['stats']:
|
|
manifest['stats'][stat_key] += tensor['byte_size']
|
|
else:
|
|
manifest['stats']['unknown_bytes'] += tensor['byte_size']
|
|
|
|
manifest['stats']['total_bytes'] += tensor['byte_size']
|
|
|
|
if organ_type == 'skeleton':
|
|
manifest['stats']['skeleton_count'] += 1
|
|
elif organ_type in ('organ', 'organ_expert'):
|
|
manifest['stats']['organ_count'] += 1
|
|
|
|
if verbose or (i + 1) % 50 == 0:
|
|
print(f" [{i+1}/{len(reader.tensors)}] {organ_type:12s} L{layer_num:3d} {tensor['name'][:60]}")
|
|
|
|
reader.close()
|
|
|
|
# Write manifest
|
|
manifest_path = out / 'manifest.json'
|
|
with open(manifest_path, 'w') as f:
|
|
json.dump(manifest, f, indent=2, default=str)
|
|
|
|
# Summary
|
|
stats = manifest['stats']
|
|
total_mb = stats['total_bytes'] / (1024 * 1024)
|
|
skel_mb = stats['skeleton_bytes'] / (1024 * 1024)
|
|
organ_mb = stats['organ_bytes'] / (1024 * 1024)
|
|
embed_mb = stats['embed_bytes'] / (1024 * 1024)
|
|
norm_mb = stats['norm_bytes'] / (1024 * 1024)
|
|
|
|
skel_pct = (stats['skeleton_bytes'] / stats['total_bytes'] * 100) if stats['total_bytes'] > 0 else 0
|
|
organ_pct = (stats['organ_bytes'] / stats['total_bytes'] * 100) if stats['total_bytes'] > 0 else 0
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f" ORGAN EXTRACTION COMPLETE — {model_name}")
|
|
print(f"{'='*60}")
|
|
print(f" Skeleton (Attention) : {skel_mb:8.1f} MB ({skel_pct:.1f}%) — {stats['skeleton_count']} tensors")
|
|
print(f" Organs (FFN) : {organ_mb:8.1f} MB ({organ_pct:.1f}%) — {stats['organ_count']} tensors")
|
|
print(f" Embedding : {embed_mb:8.1f} MB")
|
|
print(f" Normalization : {norm_mb:8.1f} MB")
|
|
print(f" Total : {total_mb:8.1f} MB")
|
|
print(f" Output : {output_dir}")
|
|
print(f" Manifest : {manifest_path}")
|
|
print(f"{'='*60}")
|
|
|
|
return manifest
|
|
|
|
|
|
# ═══ MAIN ═══
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
epilog='CSCI toolkit'
|
|
)
|
|
parser.add_argument('--model', '-m', required=True, help='Path to GGUF model file')
|
|
parser.add_argument('--output', '-o', default=None, help='Output directory (default: ./organs/<model_name>)')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Show every tensor')
|
|
parser.add_argument('--info', action='store_true', help='Show model info only, no extraction')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.model):
|
|
print(f"[ERROR] Model not found: {args.model}")
|
|
sys.exit(1)
|
|
|
|
if args.info:
|
|
reader = GGUFReader(args.model)
|
|
print(f"\nModel: {args.model}")
|
|
print(f"Size: {os.path.getsize(args.model) / (1024*1024*1024):.2f} GB")
|
|
|
|
# Show metadata
|
|
arch = reader.metadata.get('general.architecture', 'unknown')
|
|
print(f"Architecture: {arch}")
|
|
for key in sorted(reader.metadata.keys()):
|
|
if not key.startswith('_') and not key.startswith('tokenizer'):
|
|
val = reader.metadata[key]
|
|
if isinstance(val, (str, int, float, bool)):
|
|
print(f" {key}: {val}")
|
|
|
|
# Count by type
|
|
types = {}
|
|
for t in reader.tensors:
|
|
ct = classify_tensor(t['name'])
|
|
types[ct] = types.get(ct, 0) + 1
|
|
|
|
print(f"\nTensor types:")
|
|
for ct, count in sorted(types.items()):
|
|
print(f" {ct}: {count}")
|
|
|
|
print(f"Total tensors: {len(reader.tensors)}")
|
|
reader.close()
|
|
return
|
|
|
|
output_dir = args.output
|
|
if not output_dir:
|
|
model_name = os.path.basename(args.model).replace('.gguf', '')
|
|
output_dir = f"./organs/{model_name}"
|
|
|
|
extract_organs(args.model, output_dir, verbose=args.verbose)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
# ╔══ SALKA ELMADANI AUTHORSHIP CERTIFICATE ══╗
|
|
# © Salka Elmadani 2025-2026 — ALL RIGHTS RESERVED
|
|
# Licensed under Business Source License 1.1 — https://inference-x.com
|
|
# ─────────────────────────────────────────────────────────
|
|
# SHA256: 7e0a2105f5f6d458909fb71ef03bb01c4e308ac8549af00ef61c2cf89d0c8945
|
|
# SIG-ED25519: p3fNipeHSBJlVNpxsJZdvrBMJVbTAZu97RNxp7UGCkUp1TlHxH4D2XbKu46JQriNzM65myMeWGyS2WMx9atoCQ==
|
|
# VERIFY: python3 verify_authorship.py organ_extract.py
|