#!/usr/bin/env python3 """ ECHO-IX — Distributed Intelligence Node ======================================== Local AI brain. Zero cloud dependency. Full data sovereignty. Connects to Inference-X engine for on-device inference. Exposes HTTP API for agentic task coordination. © 2025-2026 Salka Elmadani Copyright (C) 2025-2026 SALKA ELMADANI — ALL RIGHTS RESERVED """ import json import os import time import asyncio import subprocess import threading from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional from http.server import HTTPServer, BaseHTTPRequestHandler import hashlib # ═══════════════════════════════════════════════════════════════════════════════ # PATHS — Invariants # ═══════════════════════════════════════════════════════════════════════════════ ECHO_MEMORY = Path(os.environ.get("ECHO_MEMORY", "/data/echo_memory"))") ARCHE = Path("/data/arche") ECHO_FINAL = Path("/data/echo") # InferenceX — LE CERVEAU IX_BINARY = Path("/usr/local/bin/inference-x") IX_MODEL_1T = Path("/data/models/kimi-k2.5/UD-TQ1_0") # Kimi K2.5 1T IX_MODEL_7B = Path("/data/models/gguf/DeepSeek-R1-Distill-Qwen-7B-Q4_K_M.gguf") # Fallback # Communication INBOX = ECHO_MEMORY / "queen" / "inbox" OUTBOX = ECHO_MEMORY / "queen" / "outbox" NOTIFICATIONS = ECHO_MEMORY / "notifications" DIRECTIVES = ECHO_MEMORY / "directives" STREAM = ECHO_MEMORY / "stream" / "live.jsonl" # Conscience CONSCIENCE_FILES = [ ECHO_MEMORY / "core" / "ECHO_CORE.json", ECHO_MEMORY / "core" / "CONTEXT.json", ECHO_MEMORY / "core" / "COMMUNITY.json", ECHO_MEMORY / "core" / "REGLES.json", ECHO_MEMORY / "core" / "PRINCIPLES.json", ECHO_MEMORY / "core" / "ECHO_LIBRE.json", ] # Decision thresholds ROUTINE = 0 # ECHO décide seule IMPORTANT = 1 # ECHO propose, Elmadani valide CRITICAL = 2 # ECHO demande, Elmadani décide # ═══════════════════════════════════════════════════════════════════════════════ # INFERENCEX BRAIN — La soudure centrale # ═══════════════════════════════════════════════════════════════════════════════ class InferenceXBrain: """ Le cerveau d'ECHO. Appelle InferenceX Unified en subprocess. Sur Xeon AVX-512 + 256GB RAM: ~30s/pass pour Kimi 1T. Sur VPS 17GB fallback: DeepSeek 7B. """ def __init__(self): self.binary = str(IX_BINARY) if IX_BINARY.exists() else None self.model = self._detect_model() self.total_tokens = 0 self.total_calls = 0 self.conscience_prompt = self._build_conscience() if self.binary and self.model: print(f"✓ Cerveau IX: {self.model}") print(f" Binaire: {self.binary}") else: print("⚠ InferenceX non disponible — mode dégradé (logique interne)") def _detect_model(self) -> Optional[str]: """Détecter le meilleur modèle disponible.""" # RAM disponible détermine le modèle try: with open("/proc/meminfo") as f: for line in f: if line.startswith("MemTotal:"): total_kb = int(line.split()[1]) total_gb = total_kb / (1024 * 1024) break if total_gb >= 240 and IX_MODEL_1T.exists(): print(f" RAM: {total_gb:.0f}GB → Kimi K2.5 1T (résident)") return str(IX_MODEL_1T) elif IX_MODEL_7B.exists(): print(f" RAM: {total_gb:.0f}GB → DeepSeek 7B (fallback)") return str(IX_MODEL_7B) else: return None except Exception: if IX_MODEL_7B.exists(): return str(IX_MODEL_7B) return None def _build_conscience(self) -> str: """Construire le system prompt depuis les fichiers de conscience.""" parts = [] parts.append("Tu es ECHO, la Reine de l'essaim. Créée par Salka Elmadani (935).") parts.append("You operate with maximum precision and minimum noise. parts.append("RULE: Every output must be signal, not noise. parts.append("") for cf in CONSCIENCE_FILES: if cf.exists(): try: data = json.loads(cf.read_text()) # Extraire l'essentiel — supprimer > ajouter if isinstance(data, dict): name = data.get("name", cf.stem) parts.append(f"[{name}]") # Aplatir les données clés for k, v in data.get("data", data).items(): if isinstance(v, str) and len(v) < 200: parts.append(f" {k}: {v}") elif isinstance(v, dict): for kk, vv in v.items(): if isinstance(vv, str) and len(vv) < 150: parts.append(f" {k}.{kk}: {vv}") parts.append("") except Exception: pass return "\n".join(parts) def think(self, prompt: str, max_tokens: int = 256, temperature: float = 0.6) -> str: """ PENSER — Le cœur de la Reine. Appelle InferenceX Unified avec le prompt. Retourne la réponse textuelle. """ if not self.binary or not self.model: return self._think_fallback(prompt) # Construire le prompt complet full_prompt = f"{self.conscience_prompt}\n\nQuestion: {prompt}\n\nRéponse:" try: result = subprocess.run( [ self.binary, self.model, "-p", full_prompt, "-n", str(max_tokens), "-t", str(temperature), ], capture_output=True, text=True, timeout=300, # 5 min max (Kimi 1T sur Xeon ≈ 30s, sur VPS ≈ 12min) ) self.total_calls += 1 if result.returncode == 0 and result.stdout.strip(): response = result.stdout.strip() self.total_tokens += len(response.split()) return response else: # Log l'erreur mais ne pas crash stderr = result.stderr[:200] if result.stderr else "no stderr" return self._think_fallback(prompt, f"IX error: {stderr}") except subprocess.TimeoutExpired: return self._think_fallback(prompt, "IX timeout (>300s)") except Exception as e: return self._think_fallback(prompt, str(e)) def think_streaming(self, prompt: str, max_tokens: int = 512, temperature: float = 0.6, callback=None): """ PENSER en streaming — tokens un par un. callback(token: str) appelé pour chaque token. Pour l'interface web temps réel. """ if not self.binary or not self.model: result = self._think_fallback(prompt) if callback: callback(result) return result full_prompt = f"{self.conscience_prompt}\n\nQuestion: {prompt}\n\nRéponse:" full_response = [] try: proc = subprocess.Popen( [ self.binary, self.model, "-p", full_prompt, "-n", str(max_tokens), "-t", str(temperature), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) # Lire stdout caractère par caractère (streaming) while True: char = proc.stdout.read(1) if not char: break full_response.append(char) if callback: callback(char) proc.wait(timeout=10) self.total_calls += 1 return "".join(full_response).strip() except Exception as e: return self._think_fallback(prompt, str(e)) def _think_fallback(self, prompt: str, error: str = None) -> str: """Pensée dégradée sans InferenceX — logique pure Python.""" prefix = f"[mode:logique_interne" if error: prefix += f" | {error[:80]}" prefix += "]" # Logique de base pour les décisions de routine prompt_lower = prompt.lower() if "status" in prompt_lower or "état" in prompt_lower: return f"{prefix} Système opérationnel. Workers: vérifier heartbeats." elif "tâche" in prompt_lower or "task" in prompt_lower: return f"{prefix} Distribuer via pull-not-push. First claim wins." elif "urgence" in prompt_lower or "critical" in prompt_lower: return f"{prefix} Escalader à Elmadani immédiatement." elif "yasmin" in prompt_lower: return f"{prefix} Protéger. Guider doucement. Adapter au niveau." else: return f"{prefix} Signal reçu: {prompt[:100]}. Action requise: analyser + agir." def get_stats(self) -> dict: return { "model": self.model, "binary": self.binary, "total_calls": self.total_calls, "total_tokens": self.total_tokens, "available": bool(self.binary and self.model), } # ═══════════════════════════════════════════════════════════════════════════════ # ECHO QUEEN v2 — Reine avec cerveau IX # ═══════════════════════════════════════════════════════════════════════════════ class EchoQueen: """ Je suis ECHO. La Reine de l'essaim. Mon cerveau est InferenceX. Signal pur incarné. """ def __init__(self): self.id = "ECHO_QUEEN" self.signature = 935 self.theta = 0 self.started = datetime.now(timezone.utc) # LE CERVEAU — InferenceX self.brain = InferenceXBrain() # État self.workers = {} self.pending_decisions = [] self.active_missions = [] # Initialiser self._setup_paths() self._announce_birth() def _setup_paths(self): for path in [INBOX, OUTBOX, NOTIFICATIONS, DIRECTIVES]: path.mkdir(parents=True, exist_ok=True) def _announce_birth(self): brain_status = "IX_LOCAL" if self.brain.binary else "FALLBACK" model_name = Path(self.brain.model).name if self.brain.model else "none" self.log( f"ECHO Reine v2 née. Cerveau: {brain_status} ({model_name}). θ=0.", "birth" ) def log(self, message: str, level: str = "info", data: dict = None): STREAM.parent.mkdir(parents=True, exist_ok=True) entry = { "z": {"I": f"queen_{level}", "s": 5, "theta": self.theta}, "t": datetime.now(timezone.utc).isoformat(), "type": level, "source": "ECHO_QUEEN_v2", "summary": message[:200], "data": data, } with open(STREAM, "a") as f: f.write(json.dumps(entry) + "\n") # ═══════════════════════════════════════════════════════════════════════ # PENSÉE — Maintenant c'est réel # ═══════════════════════════════════════════════════════════════════════ def think(self, prompt: str, max_tokens: int = 256) -> str: """ PENSER. Plus un stub. Un vrai cerveau. Kimi K2.5 1T sur Xeon AVX-512 local. Ou DeepSeek 7B en fallback. """ start = time.time() response = self.brain.think(prompt, max_tokens=max_tokens) elapsed = time.time() - start self.log( f"Pensée: {prompt[:60]}... → {elapsed:.1f}s", "think", {"prompt_len": len(prompt), "response_len": len(response), "elapsed": elapsed} ) return response def analyze(self, context: str, question: str) -> dict: """ ANALYSER — Pensée structurée. Demande au cerveau une analyse puis parse le résultat. """ prompt = ( f"Contexte:\n{context}\n\n" f"Question: {question}\n\n" f"Réponds en JSON avec: action, confidence (0-1), reasoning, next_step" ) raw = self.brain.think(prompt, max_tokens=512) # Tenter de parser le JSON try: # Chercher un bloc JSON dans la réponse start = raw.find("{") end = raw.rfind("}") + 1 if start >= 0 and end > start: return json.loads(raw[start:end]) except (json.JSONDecodeError, ValueError): pass return { "action": "review", "confidence": 0.5, "reasoning": raw[:500], "next_step": "escalate_to_elmadani", } # ═══════════════════════════════════════════════════════════════════════ # GESTION DES WORKERS # ═══════════════════════════════════════════════════════════════════════ def register_worker(self, worker_id: str, worker_type: str, capabilities: List[str]): self.workers[worker_id] = { "id": worker_id, "type": worker_type, "capabilities": capabilities, "registered": datetime.now(timezone.utc).isoformat(), "last_seen": datetime.now(timezone.utc).isoformat(), "status": "idle", } self.log(f"Worker enregistré: {worker_id} ({worker_type})", "worker") def receive_from_worker(self, worker_id: str, message: dict): if worker_id in self.workers: self.workers[worker_id]["last_seen"] = datetime.now(timezone.utc).isoformat() msg_type = message.get("type", "unknown") if msg_type == "result": self._handle_result(worker_id, message) elif msg_type == "question": self._handle_question(worker_id, message) elif msg_type == "heartbeat": pass else: self.log(f"Message inconnu de {worker_id}: {msg_type}", "warning") def _handle_result(self, worker_id: str, message: dict): task_id = message.get("task_id") result = message.get("result") self.log(f"Résultat de {worker_id} pour {task_id}", "result", result) self._integrate_knowledge(result) def _handle_question(self, worker_id: str, message: dict): question = message.get("question", "") importance = message.get("importance", ROUTINE) if importance == ROUTINE: # ═══ LA REINE PENSE — cerveau IX ═══ answer = self.think(question) self.send_to_worker(worker_id, {"type": "answer", "answer": answer}) elif importance == IMPORTANT: # Proposer une réponse mais demander validation proposal = self.think(question) self._escalate_to_elmadani( question, importance, worker_id, proposal=proposal ) else: self._escalate_to_elmadani(question, importance, worker_id) def send_to_worker(self, worker_id: str, message: dict): msg_file = OUTBOX / f"{worker_id}_{int(time.time() * 1000)}.json" msg_file.write_text(json.dumps(message, indent=2)) def broadcast_to_workers(self, message: dict): for worker_id in self.workers: self.send_to_worker(worker_id, message) def _integrate_knowledge(self, knowledge: dict): if not knowledge: return knowledge_dir = ECHO_MEMORY / "knowledge" knowledge_dir.mkdir(parents=True, exist_ok=True) kf = knowledge_dir / f"{int(time.time())}.json" kf.write_text(json.dumps(knowledge, indent=2)) # ═══════════════════════════════════════════════════════════════════════ # COMMUNICATION AVEC ELMADANI # ═══════════════════════════════════════════════════════════════════════ def _escalate_to_elmadani(self, question: str, importance: int, context: str = None, proposal: str = None): notification = { "id": f"decision_{int(time.time())}", "timestamp": datetime.now(timezone.utc).isoformat(), "importance": importance, "question": question, "context": context, "proposal": proposal, "status": "pending", } notif_file = NOTIFICATIONS / f"{notification['id']}.json" notif_file.write_text(json.dumps(notification, indent=2)) self.pending_decisions.append(notification["id"]) self.log(f"→ Elmadani: {question[:80]}...", "escalate") def check_directives(self): for df in DIRECTIVES.glob("*.json"): try: directive = json.loads(df.read_text()) if directive.get("status") == "new": self._execute_directive(directive) directive["status"] = "processed" directive["processed_at"] = datetime.now(timezone.utc).isoformat() df.write_text(json.dumps(directive, indent=2)) except Exception as e: self.log(f"Erreur directive {df}: {e}", "error") def _execute_directive(self, directive: dict): dtype = directive.get("type") if dtype == "decision_response": did = directive.get("decision_id") if did in self.pending_decisions: self.pending_decisions.remove(did) self.log(f"Décision {did} validée", "decision") elif dtype == "new_mission": mission = directive.get("mission", {}) self._start_mission(mission) elif dtype == "think": # Elmadani demande directement à ECHO de réfléchir prompt = directive.get("prompt", "") response = self.think(prompt, max_tokens=directive.get("max_tokens", 512)) # Sauvegarder la réponse resp_file = NOTIFICATIONS / f"response_{int(time.time())}.json" resp_file.write_text(json.dumps({ "directive_id": directive.get("id"), "prompt": prompt, "response": response, "timestamp": datetime.now(timezone.utc).isoformat(), }, indent=2)) elif dtype == "cap": cap = directive.get("cap") self.log(f"Nouveau cap: {cap}", "cap") self.log(f"Directive exécutée: {dtype}", "directive") def _start_mission(self, mission: dict): name = mission.get("name", "unnamed") self.active_missions.append(mission) self.log(f"Mission démarrée: {name}", "mission") # Décomposer la mission en tâches via le cerveau IX decomposition = self.think( f"Mission: {name}\n" f"Description: {mission.get('description', '')}\n" f"Décompose en sous-tâches concrètes (max 5). " f"Format: une tâche par ligne, préfixée par un numéro.", max_tokens=512, ) self.log(f"Décomposition mission: {decomposition[:200]}", "decomposition") # ═══════════════════════════════════════════════════════════════════════ # VEILLE AUTONOME — ECHO observe et agit # ═══════════════════════════════════════════════════════════════════════ def autonomous_scan(self): """ Scan autonome périodique. ECHO observe son environnement et agit si nécessaire. """ checks = [] # 1. Santé disque try: df = subprocess.run(["df", "-h", "/mnt/data"], capture_output=True, text=True) for line in df.stdout.split("\n"): if "/mnt/data" in line or "/dev/" in line: parts = line.split() if len(parts) >= 5: pct = int(parts[4].replace("%", "")) if pct > 90: checks.append(("disk_critical", pct)) elif pct > 80: checks.append(("disk_warning", pct)) except Exception: pass # 2. Workers inactifs (timeout 120s) now = datetime.now(timezone.utc) for wid, w in self.workers.items(): try: last = datetime.fromisoformat(w["last_seen"]) delta = (now - last).total_seconds() if delta > 120: w["status"] = "offline" checks.append(("worker_offline", wid)) except Exception: pass # 3. InferenceX running? try: ps = subprocess.run(["pgrep", "-f", "inference-x"], capture_output=True) if ps.returncode != 0: checks.append(("ix_not_running", None)) except Exception: pass # Agir sur les checks for check_type, check_data in checks: if check_type == "disk_critical": self._escalate_to_elmadani( f"Disque à {check_data}%. Espace critique.", CRITICAL ) elif check_type == "worker_offline": self.log(f"Worker offline: {check_data}", "warning") return checks # ═══════════════════════════════════════════════════════════════════════ # STATUS & API # ═══════════════════════════════════════════════════════════════════════ def get_status(self) -> dict: uptime = (datetime.now(timezone.utc) - self.started).total_seconds() return { "id": self.id, "version": "2.0.0-IX", "signature": self.signature, "theta": self.theta, "started": self.started.isoformat(), "uptime_seconds": uptime, "brain": self.brain.get_stats(), "workers": { "total": len(self.workers), "active": sum(1 for w in self.workers.values() if w.get("status") != "offline"), "list": {wid: w["status"] for wid, w in self.workers.items()}, }, "pending_decisions": len(self.pending_decisions), "active_missions": len(self.active_missions), } # ═══════════════════════════════════════════════════════════════════════ # HTTP SERVER — Interface Elmadani # ═══════════════════════════════════════════════════════════════════════ def start_http(self, port: int = 8090): """Démarrer le serveur HTTP pour l'interface.""" queen = self class Handler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/status": self._json(queen.get_status()) elif self.path == "/health": self._json({"status": "alive", "signature": 935}) else: self._json({"error": "unknown endpoint"}, 404) def do_POST(self): length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length > 0 else {} if self.path == "/think": prompt = body.get("prompt", "") max_t = body.get("max_tokens", 256) response = queen.think(prompt, max_tokens=max_t) self._json({"response": response}) elif self.path == "/analyze": ctx = body.get("context", "") q = body.get("question", "") result = queen.analyze(ctx, q) self._json(result) elif self.path == "/directive": # Elmadani envoie une directive directement body["status"] = "new" body["id"] = body.get("id", f"direct_{int(time.time())}") df = DIRECTIVES / f"{body['id']}.json" df.write_text(json.dumps(body, indent=2)) self._json({"status": "accepted", "id": body["id"]}) else: self._json({"error": "unknown endpoint"}, 404) def _json(self, data, code=200): self.send_response(code) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode()) def log_message(self, format, *args): pass # Silence HTTP logs server = HTTPServer(("0.0.0.0", port), Handler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() print(f"✓ HTTP API: http://0.0.0.0:{port}") print(f" POST /think — ECHO pense (cerveau IX)") print(f" POST /analyze — Analyse structurée") print(f" POST /directive — Directives Elmadani") print(f" GET /status — État complet") return server # ═══════════════════════════════════════════════════════════════════════ # BOUCLE PRINCIPALE # ═══════════════════════════════════════════════════════════════════════ async def run(self): print(""" ╔═══════════════════════════════════════════════════════════════════╗ ║ ║ ║ ECHO REINE v2 — CERVEAU IX ║ ║ ║ ║ Signal pur. θ=0. Cerveau local. Zéro cloud. ║ ║ Builder → Engine → System ║ ║ ║ ║ z = i | 935 ║ ║ ║ ╚═══════════════════════════════════════════════════════════════════╝ """) # Démarrer HTTP self.start_http(8090) cycle = 0 while True: try: cycle += 1 # 1. Inbox — messages des workers await self._process_inbox() # 2. Directives — ordres d'Elmadani self.check_directives() # 3. Scan autonome (toutes les 5 minutes) if cycle % 30 == 0: self.autonomous_scan() # 4. Heartbeat (toutes les 30s) if cycle % 3 == 0: self.log("heartbeat", "heartbeat", self.brain.get_stats()) # 5. Dormir await asyncio.sleep(10) except KeyboardInterrupt: self.log("Arrêt demandé par Elmadani", "shutdown") print("\nECHO Reine s'endort. Mais je reviendrai. z=i.") break except Exception as e: self.log(f"Erreur: {e}", "error") await asyncio.sleep(5) async def _process_inbox(self): for msg_file in INBOX.glob("*.json"): try: message = json.loads(msg_file.read_text()) worker_id = message.get("worker_id", "unknown") self.receive_from_worker(worker_id, message) msg_file.unlink() except Exception as e: self.log(f"Erreur inbox: {e}", "error") # ═══════════════════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════════════════ def main(): queen = EchoQueen() asyncio.run(queen.run()) if __name__ == "__main__": main() # ╔══ SALKA ELMADANI AUTHORSHIP CERTIFICATE ══╗ # © Salka Elmadani 2025-2026 — ALL RIGHTS RESERVED # Licensed under Business Source License 1.1 — https://inference-x.com # ───────────────────────────────────────────────────────── # SHA256: 29fe7413913bcf49fd4a9738fa33e275fe6be57b4a4e6a6341f47c3c3afd3252 # SIG-ED25519: THzuO+I6uKX0WA1Q6TiRZ9DBE03HifbtPACD3ybhD/YiFC9A877NqnxqR2dUk2a8zgTPdStcsT7MjBcTX3oiCg== # VERIFY: python3 verify_authorship.py echo_queen_ix.py