inference-x/web/ix_server.py

489 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""
IX Web — Web interface for Inference-X
https://git.inference-x.com/salka/inference-x
Zero dependencies. Pure Python stdlib.
Serves the IX Web chat UI and wraps the IX binary with an OpenAI-compatible API.
Usage:
python3 ix_server.py # auto-detect everything
python3 ix_server.py --port 8080 # custom port
python3 ix_server.py --models /path/to/models # custom model directory
python3 ix_server.py --ix /path/to/inference-x # custom IX binary path
Endpoints:
GET / → IX Web chat interface
GET /v1/models → list available models
GET /health → server status + hardware info
GET /status → busy/idle
POST /v1/chat/completions → OpenAI-compatible chat (model hot-swap)
POST /abort → kill active inference
License: BSL-1.1 (same as Inference-X)
Author: Salka Elmadani — Morocco
"""
import http.server
import json
import subprocess
import os
import sys
import time
import threading
import re
import argparse
import platform
import shutil
# ─── Configuration ──────────────────────────────────────────────────────────
DEFAULT_PORT = 9090
TIMEOUT_SECONDS = 300
MAX_TOKENS_CAP = 4096
# ─── Auto-detection ─────────────────────────────────────────────────────────
def find_ix_binary():
"""Find the inference-x binary."""
candidates = [
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "inference-x"),
os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "inference-x"),
shutil.which("inference-x") or "",
"./inference-x",
]
for c in candidates:
if c and os.path.isfile(c) and os.access(c, os.X_OK):
return os.path.abspath(c)
return None
def scan_models(dirs):
"""Scan directories for .gguf files and build model registry."""
models = {}
seen = set()
# Known model name patterns
patterns = [
(r"SmolLM2.*?(\d+[MB])", "smollm2", "HuggingFace"),
(r"Llama.*?(\d+\.?\d*[B])", "llama", "Meta"),
(r"Qwen.*?(\d+\.?\d*[B])", "qwen", "Alibaba"),
(r"Phi.*?(\d+\.?\d*)", "phi", "Microsoft"),
(r"Mistral.*?(\d+[B])", "mistral", "Mistral AI"),
(r"[Dd]eep[Ss]eek.*?[Rr]1.*?(\d+[B])", "deepseek-r1", "DeepSeek"),
(r"[Dd]eep[Ss]eek.*?[Dd]istill.*?(\d+[B])", "deepseek-r1-distill", "DeepSeek"),
(r"[Gg]emma.*?(\d+[B])", "gemma", "Google"),
(r"[Kk]imi.*?[Kk]2.*?(\d+[TB])", "kimi-k2", "Moonshot"),
]
for d in dirs:
if not os.path.isdir(d):
continue
for f in sorted(os.listdir(d)):
if not f.endswith(".gguf"):
continue
path = os.path.join(d, f)
if not os.path.isfile(path):
continue
# Generate model ID from filename
model_id = None
developer = "Unknown"
for pat, prefix, dev in patterns:
m = re.search(pat, f, re.IGNORECASE)
if m:
size = m.group(1).lower()
model_id = f"{prefix}-{size}"
developer = dev
break
if not model_id:
# Fallback: use filename
model_id = re.sub(r"[-_]Q\d.*\.gguf$", "", f).lower().replace("_", "-").replace(" ", "-")
if model_id in seen:
continue
seen.add(model_id)
size_gb = round(os.path.getsize(path) / 1e9, 1)
models[model_id] = {
"path": path,
"size_gb": size_gb,
"developer": developer,
"filename": f,
}
return models
def get_hardware_info():
"""Get system hardware information."""
info = {
"cpu": platform.processor() or "Unknown",
"arch": platform.machine(),
"os": f"{platform.system()} {platform.release()}",
"cores": os.cpu_count() or 0,
"ram_gb": 0,
}
try:
with open("/proc/meminfo") as f:
for line in f:
if line.startswith("MemTotal"):
kb = int(line.split()[1])
info["ram_gb"] = round(kb / 1024 / 1024)
break
except (FileNotFoundError, ValueError):
pass
# Try lscpu for better CPU name
try:
out = subprocess.check_output(["lscpu"], text=True, timeout=5)
for line in out.splitlines():
if "Model name" in line:
info["cpu"] = line.split(":", 1)[1].strip()
break
except (subprocess.SubprocessError, FileNotFoundError):
pass
return info
# ─── Inference protocol ────────────────────────────────────────────────────────
class IXEngine:
def __init__(self, ix_binary, models):
self.ix = ix_binary
self.models = models
self.active = None
self.lock = threading.Lock()
self.hw = get_hardware_info()
def run(self, model_id, messages, max_tokens=512, temperature=0.7, top_p=0.9):
if model_id not in self.models:
return None, f"Model not found: {model_id}. Available: {', '.join(sorted(self.models.keys()))}"
model = self.models[model_id]
path = model["path"]
if not os.path.exists(path):
return None, f"Model file missing: {path}"
# Extract messages
system_msg = ""
user_msg = ""
for m in messages:
role = m.get("role", "")
content = m.get("content", "")
if role == "system":
system_msg = content
elif role == "user":
user_msg = content
if not user_msg:
return None, "No user message provided"
cmd = [
self.ix, path,
"-p", user_msg,
"-n", str(min(max_tokens, MAX_TOKENS_CAP)),
"-t", str(temperature),
"--top-p", str(top_p),
"--ctx", "4096",
]
if system_msg:
cmd.extend(["-s", system_msg])
start = time.time()
with self.lock:
if self.active:
return None, "Server busy — another inference is running"
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.active = proc
except Exception as e:
return None, f"Failed to start inference: {e}"
try:
stdout, stderr = proc.communicate(timeout=TIMEOUT_SECONDS)
elapsed = time.time() - start
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
return None, f"Inference timeout ({TIMEOUT_SECONDS}s)"
finally:
with self.lock:
self.active = None
if proc.returncode != 0 and not stdout.strip():
err = stderr.decode("utf-8", errors="replace")[:500]
return None, f"IX exited with code {proc.returncode}: {err}"
output = self._parse_output(stdout.decode("utf-8", errors="replace"))
all_text = stdout.decode("utf-8", errors="replace") + stderr.decode("utf-8", errors="replace")
tps = self._extract_tps(all_text)
if not output:
output = "(Model loaded but generated no text)"
token_count = len(output.split())
if tps == 0 and elapsed > 0:
tps = round(token_count / elapsed, 1)
return {
"output": output,
"model": model_id,
"tokens": token_count,
"elapsed": round(elapsed, 2),
"tokens_per_second": round(tps, 1),
}, None
def abort(self):
with self.lock:
if self.active:
self.active.kill()
return True
return False
def is_busy(self):
with self.lock:
return self.active is not None
@staticmethod
def _parse_output(raw):
lines = raw.split("\n")
gen_lines = []
in_output = False
for line in lines:
if "OUTPUT" in line and "───" in line:
in_output = True
continue
if in_output:
if line.startswith("──────"):
break
if line.startswith("[DBG]") or line.startswith("[GEN]"):
continue
gen_lines.append(line)
text = "\n".join(gen_lines).strip()
text = text.replace("Ċ", "\n").strip()
return text
@staticmethod
def _extract_tps(text):
for line in text.split("\n"):
if "tok/s" in line or "tokens/sec" in line:
m = re.search(r"([\d.]+)\s*tok", line)
if m:
return float(m.group(1))
return 0
# ─── HTTP Handler ────────────────────────────────────────────────────────────
WEB_DIR = os.path.dirname(os.path.abspath(__file__))
class IXHandler(http.server.BaseHTTPRequestHandler):
engine: IXEngine = None
def log_message(self, fmt, *args):
ts = time.strftime("%H:%M:%S")
sys.stderr.write(f"[{ts}] {args[0]}\n")
def send_json(self, code, data):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.end_headers()
self.wfile.write(body)
def send_file(self, path, content_type):
try:
with open(path, "rb") as f:
data = f.read()
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
except FileNotFoundError:
self.send_json(404, {"error": "File not found"})
def do_OPTIONS(self):
self.send_json(204, {})
def do_GET(self):
path = self.path.split("?")[0]
if path == "/" or path == "/":
self.send_file(os.path.join(WEB_DIR, "chat.html"), "text/html; charset=utf-8")
elif path in ("/v1/models", "/models"):
model_list = []
for mid, info in sorted(self.engine.models.items()):
model_list.append({
"id": mid,
"object": "model",
"owned_by": "inference-x",
"ready": os.path.exists(info["path"]),
"size_gb": info["size_gb"],
"developer": info["developer"],
})
self.send_json(200, {"object": "list", "data": model_list})
elif path == "/health":
hw = self.engine.hw
self.send_json(200, {
"status": "ok",
"engine": "inference-x",
"models": len(self.engine.models),
"ram_gb": hw["ram_gb"],
"cores": hw["cores"],
"cpu": hw["cpu"],
"arch": hw["arch"],
})
elif path == "/status":
self.send_json(200, {"busy": self.engine.is_busy()})
else:
self.send_json(404, {"error": "Not found"})
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length > 0 else {}
path = self.path.split("?")[0]
if path in ("/v1/chat/completions", "/chat"):
model_id = body.get("model", "auto")
messages = body.get("messages", [])
max_tokens = body.get("max_tokens", 512)
temperature = body.get("temperature", 0.7)
top_p = body.get("top_p", 0.9)
# Auto-select: pick smallest ready model
if model_id == "auto":
ready = {k: v for k, v in self.engine.models.items() if os.path.exists(v["path"])}
if ready:
model_id = min(ready, key=lambda k: ready[k]["size_gb"])
else:
self.send_json(500, {"error": "No models available"})
return
result, error = self.engine.run(model_id, messages, max_tokens, temperature, top_p)
if error:
self.send_json(500, {"error": error})
return
self.send_json(200, {
"id": f"ix-{int(time.time() * 1000)}",
"object": "chat.completion",
"created": int(time.time()),
"model": result["model"],
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": result["output"]},
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": result["tokens"],
"total_tokens": result["tokens"],
},
"ix": {
"elapsed": result["elapsed"],
"tokens_per_second": result["tokens_per_second"],
},
})
elif path == "/abort":
aborted = self.engine.abort()
self.send_json(200, {"aborted": aborted})
else:
self.send_json(404, {"error": "Not found"})
# ─── Main ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="IX Web — Web interface for Inference-X",
epilog="https://git.inference-x.com/salka/inference-x",
)
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Port (default: {DEFAULT_PORT})")
parser.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
parser.add_argument("--ix", default=None, help="Path to inference-x binary")
parser.add_argument("--models", action="append", default=None, help="Model directory (can specify multiple)")
args = parser.parse_args()
print("""
╔══════════════════════════════════════════╗
║ IX Web — Inference-X Interface ║
║ Run AI models. On your hardware. ║
╚══════════════════════════════════════════╝
""")
# Find IX binary
ix_bin = args.ix or find_ix_binary()
if not ix_bin:
print("ERROR: inference-x binary not found.")
print(" Build it: make")
print(" Or specify: --ix /path/to/inference-x")
sys.exit(1)
print(f" Engine: {ix_bin}")
# Find models
model_dirs = args.models or []
# Auto-scan common locations
repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
auto_dirs = [
os.path.join(repo_root, "models"),
os.path.expanduser("~/.cache/inference-x/models"),
"./models",
"~/models",
os.path.expanduser("~/models"),
]
for d in auto_dirs:
if os.path.isdir(d) and d not in model_dirs:
model_dirs.append(d)
models = scan_models(model_dirs)
if not models:
print("\n WARNING: No .gguf models found!")
print(" Download one: ./ix download qwen-2.5-3b")
print(f" Or place .gguf files in: {', '.join(model_dirs[:3])}")
else:
print(f" Models: {len(models)} found\n")
for mid, info in sorted(models.items(), key=lambda x: x[1]["size_gb"]):
print(f"{mid:28s} {info['size_gb']:>6.1f} GB ({info['developer']})")
# Hardware
hw = get_hardware_info()
print(f"\n Hardware: {hw['cores']} cores, {hw['ram_gb']} GB RAM")
print(f" CPU: {hw['cpu']}")
# Start server
engine = IXEngine(ix_bin, models)
IXHandler.engine = engine
print(f"\n ──────────────────────────────────────────")
print(f" IX Web ready: http://localhost:{args.port}")
print(f" API: http://localhost:{args.port}/v1/chat/completions")
print(f" ──────────────────────────────────────────\n")
server = http.server.HTTPServer((args.host, args.port), IXHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n IX Web stopped.")
server.shutdown()
if __name__ == "__main__":
main()